Revert "Revert "Store the subscriber, client, service and timer"" (#449)

* Revert "Revert "Store the subscriber, client, service and timer (#431)" (#448)"

This reverts commit 168d75cf1e.

* Convert all rcl_*_t types to shared pointers

Converts all rcl_*_t types in the memory allocation strategy to shared pointers to prevent crash happening when a subscriber is reset.

Issue: #349

* fixups
This commit is contained in:
William Woodall 2018-03-19 21:05:26 -07:00 committed by GitHub
parent af6e86c522
commit 9ce5aaa792
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 235 additions and 112 deletions

View file

@ -125,6 +125,7 @@ ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME}) ament_export_libraries(${PROJECT_NAME})
if(BUILD_TESTING) if(BUILD_TESTING)
find_package(ament_cmake_gtest REQUIRED)
find_package(ament_lint_auto REQUIRED) find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies() ament_lint_auto_find_test_dependencies()

View file

@ -68,11 +68,11 @@ public:
get_service_name() const; get_service_name() const;
RCLCPP_PUBLIC RCLCPP_PUBLIC
rcl_client_t * std::shared_ptr<rcl_client_t>
get_client_handle(); get_client_handle();
RCLCPP_PUBLIC RCLCPP_PUBLIC
const rcl_client_t * std::shared_ptr<const rcl_client_t>
get_client_handle() const; get_client_handle() const;
RCLCPP_PUBLIC RCLCPP_PUBLIC
@ -112,7 +112,8 @@ protected:
rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_; rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
std::shared_ptr<rcl_node_t> node_handle_; std::shared_ptr<rcl_node_t> node_handle_;
rcl_client_t client_handle_ = rcl_get_zero_initialized_client(); std::shared_ptr<rcl_client_t> client_handle_;
std::string service_name_; std::string service_name_;
}; };
@ -148,7 +149,7 @@ public:
auto service_type_support_handle = auto service_type_support_handle =
get_service_type_support_handle<ServiceT>(); get_service_type_support_handle<ServiceT>();
rcl_ret_t ret = rcl_client_init( rcl_ret_t ret = rcl_client_init(
&client_handle_, this->get_client_handle().get(),
this->get_rcl_node_handle(), this->get_rcl_node_handle(),
service_type_support_handle, service_type_support_handle,
service_name.c_str(), service_name.c_str(),
@ -170,11 +171,6 @@ public:
virtual ~Client() virtual ~Client()
{ {
if (rcl_client_fini(&client_handle_, this->get_rcl_node_handle()) != RCL_RET_OK) {
fprintf(stderr,
"Error in destruction of rcl client handle: %s\n", rcl_get_error_string_safe());
rcl_reset_error();
}
} }
std::shared_ptr<void> std::shared_ptr<void>
@ -238,7 +234,7 @@ public:
{ {
std::lock_guard<std::mutex> lock(pending_requests_mutex_); std::lock_guard<std::mutex> lock(pending_requests_mutex_);
int64_t sequence_number; int64_t sequence_number;
rcl_ret_t ret = rcl_send_request(get_client_handle(), request.get(), &sequence_number); rcl_ret_t ret = rcl_send_request(get_client_handle().get(), request.get(), &sequence_number);
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request"); rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
} }

View file

@ -83,14 +83,18 @@ public:
static rclcpp::SubscriptionBase::SharedPtr static rclcpp::SubscriptionBase::SharedPtr
get_subscription_by_handle( get_subscription_by_handle(
const rcl_subscription_t * subscriber_handle, std::shared_ptr<const rcl_subscription_t> subscriber_handle,
const WeakNodeVector & weak_nodes); const WeakNodeVector & weak_nodes);
static rclcpp::ServiceBase::SharedPtr static rclcpp::ServiceBase::SharedPtr
get_service_by_handle(const rcl_service_t * service_handle, const WeakNodeVector & weak_nodes); get_service_by_handle(
std::shared_ptr<const rcl_service_t> service_handle,
const WeakNodeVector & weak_nodes);
static rclcpp::ClientBase::SharedPtr static rclcpp::ClientBase::SharedPtr
get_client_by_handle(const rcl_client_t * client_handle, const WeakNodeVector & weak_nodes); get_client_by_handle(
std::shared_ptr<const rcl_client_t> client_handle,
const WeakNodeVector & weak_nodes);
static rclcpp::node_interfaces::NodeBaseInterface::SharedPtr static rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_by_group( get_node_by_group(

View file

@ -30,6 +30,7 @@
#include "rclcpp/type_support_decl.hpp" #include "rclcpp/type_support_decl.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
#include "rclcpp/logging.hpp"
#include "rmw/error_handling.h" #include "rmw/error_handling.h"
#include "rmw/rmw.h" #include "rmw/rmw.h"
@ -58,11 +59,11 @@ public:
get_service_name(); get_service_name();
RCLCPP_PUBLIC RCLCPP_PUBLIC
rcl_service_t * std::shared_ptr<rcl_service_t>
get_service_handle(); get_service_handle();
RCLCPP_PUBLIC RCLCPP_PUBLIC
const rcl_service_t * std::shared_ptr<const rcl_service_t>
get_service_handle() const; get_service_handle() const;
virtual std::shared_ptr<void> create_request() = 0; virtual std::shared_ptr<void> create_request() = 0;
@ -84,7 +85,7 @@ protected:
std::shared_ptr<rcl_node_t> node_handle_; std::shared_ptr<rcl_node_t> node_handle_;
rcl_service_t * service_handle_ = nullptr; std::shared_ptr<rcl_service_t> service_handle_;
std::string service_name_; std::string service_name_;
bool owns_rcl_handle_ = true; bool owns_rcl_handle_ = true;
}; };
@ -115,12 +116,32 @@ public:
using rosidl_typesupport_cpp::get_service_type_support_handle; using rosidl_typesupport_cpp::get_service_type_support_handle;
auto service_type_support_handle = get_service_type_support_handle<ServiceT>(); auto service_type_support_handle = get_service_type_support_handle<ServiceT>();
std::weak_ptr<rcl_node_t> weak_node_handle(node_handle_);
// rcl does the static memory allocation here // rcl does the static memory allocation here
service_handle_ = new rcl_service_t; service_handle_ = std::shared_ptr<rcl_service_t>(
*service_handle_ = rcl_get_zero_initialized_service(); new rcl_service_t, [weak_node_handle](rcl_service_t * service)
{
auto handle = weak_node_handle.lock();
if (handle) {
if (rcl_service_fini(service, handle.get()) != RCL_RET_OK) {
RCLCPP_ERROR(
rclcpp::get_logger(rcl_node_get_logger_name(handle.get())).get_child("rclcpp"),
"Error in destruction of rcl service handle: %s",
rcl_get_error_string_safe());
rcl_reset_error();
}
} else {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"Error in destruction of rcl service handle: "
"the Node Handle was destructed too early. You will leak memory");
}
delete service;
});
*service_handle_.get() = rcl_get_zero_initialized_service();
rcl_ret_t ret = rcl_service_init( rcl_ret_t ret = rcl_service_init(
service_handle_, service_handle_.get(),
node_handle.get(), node_handle.get(),
service_type_support_handle, service_type_support_handle,
service_name.c_str(), service_name.c_str(),
@ -141,6 +162,29 @@ public:
} }
} }
Service(
std::shared_ptr<rcl_node_t> node_handle,
std::shared_ptr<rcl_service_t> service_handle,
AnyServiceCallback<ServiceT> any_callback)
: ServiceBase(node_handle),
any_callback_(any_callback)
{
// check if service handle was initialized
if (!rcl_service_is_valid(service_handle.get(), nullptr)) {
// *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
throw std::runtime_error(
std::string("rcl_service_t in constructor argument must be initialized beforehand."));
// *INDENT-ON*
}
const char * service_name = rcl_service_get_service_name(service_handle.get());
if (!service_name) {
throw std::runtime_error("failed to get service name");
}
service_handle_ = service_handle;
service_name_ = std::string(service_name);
}
Service( Service(
std::shared_ptr<rcl_node_t> node_handle, std::shared_ptr<rcl_node_t> node_handle,
rcl_service_t * service_handle, rcl_service_t * service_handle,
@ -149,10 +193,7 @@ public:
any_callback_(any_callback) any_callback_(any_callback)
{ {
// check if service handle was initialized // check if service handle was initialized
// TODO(karsten1987): Take this verification if (!rcl_service_is_valid(service_handle, nullptr)) {
// directly in rcl_*_t
// see: https://github.com/ros2/rcl/issues/81
if (!service_handle->impl) {
// *INDENT-OFF* (prevent uncrustify from making unnecessary indents here) // *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
throw std::runtime_error( throw std::runtime_error(
std::string("rcl_service_t in constructor argument must be initialized beforehand.")); std::string("rcl_service_t in constructor argument must be initialized beforehand."));
@ -163,26 +204,17 @@ public:
if (!service_name) { if (!service_name) {
throw std::runtime_error("failed to get service name"); throw std::runtime_error("failed to get service name");
} }
service_handle_ = service_handle;
service_name_ = std::string(service_name); service_name_ = std::string(service_name);
owns_rcl_handle_ = false;
// In this case, rcl owns the service handle memory
service_handle_ = std::shared_ptr<rcl_service_t>(new rcl_service_t);
service_handle_->impl = service_handle->impl;
} }
Service() = delete; Service() = delete;
virtual ~Service() virtual ~Service()
{ {
// check if you have ownership of the handle
if (owns_rcl_handle_) {
if (rcl_service_fini(service_handle_, node_handle_.get()) != RCL_RET_OK) {
std::stringstream ss;
ss << "Error in destruction of rcl service_handle_ handle: " <<
rcl_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
rcl_reset_error();
}
delete service_handle_;
}
} }
std::shared_ptr<void> create_request() std::shared_ptr<void> create_request()
@ -211,7 +243,7 @@ public:
std::shared_ptr<rmw_request_id_t> req_id, std::shared_ptr<rmw_request_id_t> req_id,
std::shared_ptr<typename ServiceT::Response> response) std::shared_ptr<typename ServiceT::Response> response)
{ {
rcl_ret_t status = rcl_send_response(get_service_handle(), req_id.get(), response.get()); rcl_ret_t status = rcl_send_response(get_service_handle().get(), req_id.get(), response.get());
if (status != RCL_RET_OK) { if (status != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to send response"); rclcpp::exceptions::throw_from_rcl_error(status, "failed to send response");

View file

@ -98,22 +98,22 @@ public:
{ {
for (size_t i = 0; i < wait_set->size_of_subscriptions; ++i) { for (size_t i = 0; i < wait_set->size_of_subscriptions; ++i) {
if (!wait_set->subscriptions[i]) { if (!wait_set->subscriptions[i]) {
subscription_handles_[i] = nullptr; subscription_handles_[i].reset();
} }
} }
for (size_t i = 0; i < wait_set->size_of_services; ++i) { for (size_t i = 0; i < wait_set->size_of_services; ++i) {
if (!wait_set->services[i]) { if (!wait_set->services[i]) {
service_handles_[i] = nullptr; service_handles_[i].reset();
} }
} }
for (size_t i = 0; i < wait_set->size_of_clients; ++i) { for (size_t i = 0; i < wait_set->size_of_clients; ++i) {
if (!wait_set->clients[i]) { if (!wait_set->clients[i]) {
client_handles_[i] = nullptr; client_handles_[i].reset();
} }
} }
for (size_t i = 0; i < wait_set->size_of_timers; ++i) { for (size_t i = 0; i < wait_set->size_of_timers; ++i) {
if (!wait_set->timers[i]) { if (!wait_set->timers[i]) {
timer_handles_[i] = nullptr; timer_handles_[i].reset();
} }
} }
@ -188,7 +188,7 @@ public:
bool add_handles_to_wait_set(rcl_wait_set_t * wait_set) bool add_handles_to_wait_set(rcl_wait_set_t * wait_set)
{ {
for (auto subscription : subscription_handles_) { for (auto subscription : subscription_handles_) {
if (rcl_wait_set_add_subscription(wait_set, subscription) != RCL_RET_OK) { if (rcl_wait_set_add_subscription(wait_set, subscription.get()) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED( RCUTILS_LOG_ERROR_NAMED(
"rclcpp", "rclcpp",
"Couldn't add subscription to wait set: %s", rcl_get_error_string_safe()); "Couldn't add subscription to wait set: %s", rcl_get_error_string_safe());
@ -197,7 +197,7 @@ public:
} }
for (auto client : client_handles_) { for (auto client : client_handles_) {
if (rcl_wait_set_add_client(wait_set, client) != RCL_RET_OK) { if (rcl_wait_set_add_client(wait_set, client.get()) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED( RCUTILS_LOG_ERROR_NAMED(
"rclcpp", "rclcpp",
"Couldn't add client to wait set: %s", rcl_get_error_string_safe()); "Couldn't add client to wait set: %s", rcl_get_error_string_safe());
@ -206,7 +206,7 @@ public:
} }
for (auto service : service_handles_) { for (auto service : service_handles_) {
if (rcl_wait_set_add_service(wait_set, service) != RCL_RET_OK) { if (rcl_wait_set_add_service(wait_set, service.get()) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED( RCUTILS_LOG_ERROR_NAMED(
"rclcpp", "rclcpp",
"Couldn't add service to wait set: %s", rcl_get_error_string_safe()); "Couldn't add service to wait set: %s", rcl_get_error_string_safe());
@ -215,7 +215,7 @@ public:
} }
for (auto timer : timer_handles_) { for (auto timer : timer_handles_) {
if (rcl_wait_set_add_timer(wait_set, timer) != RCL_RET_OK) { if (rcl_wait_set_add_timer(wait_set, timer.get()) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED( RCUTILS_LOG_ERROR_NAMED(
"rclcpp", "rclcpp",
"Couldn't add timer to wait set: %s", rcl_get_error_string_safe()); "Couldn't add timer to wait set: %s", rcl_get_error_string_safe());
@ -391,10 +391,10 @@ private:
VectorRebind<const rcl_guard_condition_t *> guard_conditions_; VectorRebind<const rcl_guard_condition_t *> guard_conditions_;
VectorRebind<const rcl_subscription_t *> subscription_handles_; VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_;
VectorRebind<const rcl_service_t *> service_handles_; VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
VectorRebind<const rcl_client_t *> client_handles_; VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_;
VectorRebind<const rcl_timer_t *> timer_handles_; VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
std::shared_ptr<ExecAlloc> executable_allocator_; std::shared_ptr<ExecAlloc> executable_allocator_;
std::shared_ptr<VoidAlloc> allocator_; std::shared_ptr<VoidAlloc> allocator_;

View file

@ -76,15 +76,15 @@ public:
get_topic_name() const; get_topic_name() const;
RCLCPP_PUBLIC RCLCPP_PUBLIC
rcl_subscription_t * std::shared_ptr<rcl_subscription_t>
get_subscription_handle(); get_subscription_handle();
RCLCPP_PUBLIC RCLCPP_PUBLIC
const rcl_subscription_t * const std::shared_ptr<rcl_subscription_t>
get_subscription_handle() const; get_subscription_handle() const;
RCLCPP_PUBLIC RCLCPP_PUBLIC
virtual const rcl_subscription_t * virtual const std::shared_ptr<rcl_subscription_t>
get_intra_process_subscription_handle() const; get_intra_process_subscription_handle() const;
/// Borrow a new message. /// Borrow a new message.
@ -110,8 +110,8 @@ public:
const rmw_message_info_t & message_info) = 0; const rmw_message_info_t & message_info) = 0;
protected: protected:
rcl_subscription_t intra_process_subscription_handle_ = rcl_get_zero_initialized_subscription(); std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
rcl_subscription_t subscription_handle_ = rcl_get_zero_initialized_subscription(); std::shared_ptr<rcl_subscription_t> subscription_handle_;
std::shared_ptr<rcl_node_t> node_handle_; std::shared_ptr<rcl_node_t> node_handle_;
private: private:
@ -241,7 +241,7 @@ public:
{ {
std::string intra_process_topic_name = std::string(get_topic_name()) + "/_intra"; std::string intra_process_topic_name = std::string(get_topic_name()) + "/_intra";
rcl_ret_t ret = rcl_subscription_init( rcl_ret_t ret = rcl_subscription_init(
&intra_process_subscription_handle_, intra_process_subscription_handle_.get(),
node_handle_.get(), node_handle_.get(),
rclcpp::type_support::get_intra_process_message_msg_type_support(), rclcpp::type_support::get_intra_process_message_msg_type_support(),
intra_process_topic_name.c_str(), intra_process_topic_name.c_str(),
@ -266,13 +266,13 @@ public:
} }
/// Implemenation detail. /// Implemenation detail.
const rcl_subscription_t * const std::shared_ptr<rcl_subscription_t>
get_intra_process_subscription_handle() const get_intra_process_subscription_handle() const
{ {
if (!get_intra_process_message_callback_) { if (!get_intra_process_message_callback_) {
return nullptr; return nullptr;
} }
return &intra_process_subscription_handle_; return intra_process_subscription_handle_;
} }
private: private:

View file

@ -62,7 +62,7 @@ public:
execute_callback() = 0; execute_callback() = 0;
RCLCPP_PUBLIC RCLCPP_PUBLIC
const rcl_timer_t * std::shared_ptr<const rcl_timer_t>
get_timer_handle(); get_timer_handle();
/// Check how long the timer has until its next scheduled callback. /// Check how long the timer has until its next scheduled callback.
@ -85,7 +85,7 @@ public:
bool is_ready(); bool is_ready();
protected: protected:
rcl_timer_t timer_handle_ = rcl_get_zero_initialized_timer(); std::shared_ptr<rcl_timer_t> timer_handle_;
}; };
@ -122,15 +122,12 @@ public:
{ {
// Stop the timer from running. // Stop the timer from running.
cancel(); cancel();
if (rcl_timer_fini(&timer_handle_) != RCL_RET_OK) {
fprintf(stderr, "Failed to clean up rcl timer handle: %s\n", rcl_get_error_string_safe());
}
} }
void void
execute_callback() execute_callback()
{ {
rcl_ret_t ret = rcl_timer_call(&timer_handle_); rcl_ret_t ret = rcl_timer_call(timer_handle_.get());
if (ret == RCL_RET_TIMER_CANCELED) { if (ret == RCL_RET_TIMER_CANCELED) {
return; return;
} }

View file

@ -26,6 +26,7 @@
#include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp" #include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/utilities.hpp" #include "rclcpp/utilities.hpp"
#include "rclcpp/logging.hpp"
using rclcpp::ClientBase; using rclcpp::ClientBase;
using rclcpp::exceptions::InvalidNodeError; using rclcpp::exceptions::InvalidNodeError;
@ -38,9 +39,35 @@ ClientBase::ClientBase(
: node_graph_(node_graph), : node_graph_(node_graph),
node_handle_(node_base->get_shared_rcl_node_handle()), node_handle_(node_base->get_shared_rcl_node_handle()),
service_name_(service_name) service_name_(service_name)
{} {
std::weak_ptr<rcl_node_t> weak_node_handle(node_handle_);
client_handle_ = std::shared_ptr<rcl_client_t>(
new rcl_client_t, [weak_node_handle](rcl_client_t * client)
{
auto handle = weak_node_handle.lock();
if (handle) {
if (rcl_client_fini(client, handle.get()) != RCL_RET_OK) {
RCLCPP_ERROR(
rclcpp::get_logger(rcl_node_get_logger_name(handle.get())).get_child("rclcpp"),
"Error in destruction of rcl client handle: %s", rcl_get_error_string_safe());
rcl_reset_error();
}
} else {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"Error in destruction of rcl client handle: "
"the Node Handle was destructed too early. You will leak memory");
}
delete client;
});
*client_handle_.get() = rcl_get_zero_initialized_client();
}
ClientBase::~ClientBase() {} ClientBase::~ClientBase()
{
// Make sure the client handle is destructed as early as possible and before the node handle
client_handle_.reset();
}
const std::string & const std::string &
ClientBase::get_service_name() const ClientBase::get_service_name() const
@ -48,24 +75,26 @@ ClientBase::get_service_name() const
return this->service_name_; return this->service_name_;
} }
rcl_client_t * std::shared_ptr<rcl_client_t>
ClientBase::get_client_handle() ClientBase::get_client_handle()
{ {
return &client_handle_; return client_handle_;
} }
const rcl_client_t * std::shared_ptr<const rcl_client_t>
ClientBase::get_client_handle() const ClientBase::get_client_handle() const
{ {
return &client_handle_; return client_handle_;
} }
bool bool
ClientBase::service_is_ready() const ClientBase::service_is_ready() const
{ {
bool is_ready; bool is_ready;
rcl_ret_t ret = rcl_ret_t ret = rcl_service_server_is_available(
rcl_service_server_is_available(this->get_rcl_node_handle(), &client_handle_, &is_ready); this->get_rcl_node_handle(),
this->get_client_handle().get(),
&is_ready);
if (ret != RCL_RET_OK) { if (ret != RCL_RET_OK) {
throw_from_rcl_error(ret, "rcl_service_server_is_available failed"); throw_from_rcl_error(ret, "rcl_service_server_is_available failed");
} }

View file

@ -280,7 +280,7 @@ Executor::execute_subscription(
std::shared_ptr<void> message = subscription->create_message(); std::shared_ptr<void> message = subscription->create_message();
rmw_message_info_t message_info; rmw_message_info_t message_info;
auto ret = rcl_take(subscription->get_subscription_handle(), auto ret = rcl_take(subscription->get_subscription_handle().get(),
message.get(), &message_info); message.get(), &message_info);
if (ret == RCL_RET_OK) { if (ret == RCL_RET_OK) {
message_info.from_intra_process = false; message_info.from_intra_process = false;
@ -302,7 +302,7 @@ Executor::execute_intra_process_subscription(
rcl_interfaces::msg::IntraProcessMessage ipm; rcl_interfaces::msg::IntraProcessMessage ipm;
rmw_message_info_t message_info; rmw_message_info_t message_info;
rcl_ret_t status = rcl_take( rcl_ret_t status = rcl_take(
subscription->get_intra_process_subscription_handle(), subscription->get_intra_process_subscription_handle().get(),
&ipm, &ipm,
&message_info); &message_info);
@ -332,7 +332,7 @@ Executor::execute_service(
auto request_header = service->create_request_header(); auto request_header = service->create_request_header();
std::shared_ptr<void> request = service->create_request(); std::shared_ptr<void> request = service->create_request();
rcl_ret_t status = rcl_take_request( rcl_ret_t status = rcl_take_request(
service->get_service_handle(), service->get_service_handle().get(),
request_header.get(), request_header.get(),
request.get()); request.get());
if (status == RCL_RET_OK) { if (status == RCL_RET_OK) {
@ -353,7 +353,7 @@ Executor::execute_client(
auto request_header = client->create_request_header(); auto request_header = client->create_request_header();
std::shared_ptr<void> response = client->create_response(); std::shared_ptr<void> response = client->create_response();
rcl_ret_t status = rcl_take_response( rcl_ret_t status = rcl_take_response(
client->get_client_handle(), client->get_client_handle().get(),
request_header.get(), request_header.get(),
response.get()); response.get());
if (status == RCL_RET_OK) { if (status == RCL_RET_OK) {

View file

@ -13,12 +13,14 @@
// limitations under the License. // limitations under the License.
#include "rclcpp/memory_strategy.hpp" #include "rclcpp/memory_strategy.hpp"
#include <memory>
using rclcpp::memory_strategy::MemoryStrategy; using rclcpp::memory_strategy::MemoryStrategy;
rclcpp::SubscriptionBase::SharedPtr rclcpp::SubscriptionBase::SharedPtr
MemoryStrategy::get_subscription_by_handle( MemoryStrategy::get_subscription_by_handle(
const rcl_subscription_t * subscriber_handle, const WeakNodeVector & weak_nodes) std::shared_ptr<const rcl_subscription_t> subscriber_handle,
const WeakNodeVector & weak_nodes)
{ {
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock(); auto node = weak_node.lock();
@ -48,7 +50,7 @@ MemoryStrategy::get_subscription_by_handle(
rclcpp::ServiceBase::SharedPtr rclcpp::ServiceBase::SharedPtr
MemoryStrategy::get_service_by_handle( MemoryStrategy::get_service_by_handle(
const rcl_service_t * service_handle, std::shared_ptr<const rcl_service_t> service_handle,
const WeakNodeVector & weak_nodes) const WeakNodeVector & weak_nodes)
{ {
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
@ -74,7 +76,7 @@ MemoryStrategy::get_service_by_handle(
rclcpp::ClientBase::SharedPtr rclcpp::ClientBase::SharedPtr
MemoryStrategy::get_client_by_handle( MemoryStrategy::get_client_by_handle(
const rcl_client_t * client_handle, std::shared_ptr<const rcl_client_t> client_handle,
const WeakNodeVector & weak_nodes) const WeakNodeVector & weak_nodes)
{ {
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {

View file

@ -46,13 +46,13 @@ ServiceBase::get_service_name()
return this->service_name_; return this->service_name_;
} }
rcl_service_t * std::shared_ptr<rcl_service_t>
ServiceBase::get_service_handle() ServiceBase::get_service_handle()
{ {
return service_handle_; return service_handle_;
} }
const rcl_service_t * std::shared_ptr<const rcl_service_t>
ServiceBase::get_service_handle() const ServiceBase::get_service_handle() const
{ {
return service_handle_; return service_handle_;

View file

@ -20,11 +20,11 @@
#include "rclcpp/exceptions.hpp" #include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/logging.hpp"
#include "rmw/error_handling.h" #include "rmw/error_handling.h"
#include "rmw/rmw.h" #include "rmw/rmw.h"
using rclcpp::SubscriptionBase; using rclcpp::SubscriptionBase;
SubscriptionBase::SubscriptionBase( SubscriptionBase::SubscriptionBase(
@ -34,8 +34,37 @@ SubscriptionBase::SubscriptionBase(
const rcl_subscription_options_t & subscription_options) const rcl_subscription_options_t & subscription_options)
: node_handle_(node_handle) : node_handle_(node_handle)
{ {
std::weak_ptr<rcl_node_t> weak_node_handle(node_handle_);
auto custom_deletor = [weak_node_handle](rcl_subscription_t * rcl_subs)
{
auto handle = weak_node_handle.lock();
if (handle) {
if (rcl_subscription_fini(rcl_subs, handle.get()) != RCL_RET_OK) {
RCLCPP_ERROR(
rclcpp::get_logger(rcl_node_get_logger_name(handle.get())).get_child("rclcpp"),
"Error in destruction of rcl subscription handle: %s",
rcl_get_error_string_safe());
rcl_reset_error();
}
} else {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"Error in destruction of rcl subscription handle: "
"the Node Handle was destructed too early. You will leak memory");
}
delete rcl_subs;
};
subscription_handle_ = std::shared_ptr<rcl_subscription_t>(
new rcl_subscription_t, custom_deletor);
*subscription_handle_.get() = rcl_get_zero_initialized_subscription();
intra_process_subscription_handle_ = std::shared_ptr<rcl_subscription_t>(
new rcl_subscription_t, custom_deletor);
*intra_process_subscription_handle_.get() = rcl_get_zero_initialized_subscription();
rcl_ret_t ret = rcl_subscription_init( rcl_ret_t ret = rcl_subscription_init(
&subscription_handle_, subscription_handle_.get(),
node_handle_.get(), node_handle_.get(),
&type_support_handle, &type_support_handle,
topic_name.c_str(), topic_name.c_str(),
@ -57,42 +86,28 @@ SubscriptionBase::SubscriptionBase(
SubscriptionBase::~SubscriptionBase() SubscriptionBase::~SubscriptionBase()
{ {
if (rcl_subscription_fini(&subscription_handle_, node_handle_.get()) != RCL_RET_OK) {
std::stringstream ss;
ss << "Error in destruction of rcl subscription handle: " <<
rcl_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
}
if (rcl_subscription_fini(
&intra_process_subscription_handle_, node_handle_.get()) != RCL_RET_OK)
{
std::stringstream ss;
ss << "Error in destruction of rmw intra process subscription handle: " <<
rcl_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
}
} }
const char * const char *
SubscriptionBase::get_topic_name() const SubscriptionBase::get_topic_name() const
{ {
return rcl_subscription_get_topic_name(&subscription_handle_); return rcl_subscription_get_topic_name(subscription_handle_.get());
} }
rcl_subscription_t * std::shared_ptr<rcl_subscription_t>
SubscriptionBase::get_subscription_handle() SubscriptionBase::get_subscription_handle()
{ {
return &subscription_handle_; return subscription_handle_;
} }
const rcl_subscription_t * const std::shared_ptr<rcl_subscription_t>
SubscriptionBase::get_subscription_handle() const SubscriptionBase::get_subscription_handle() const
{ {
return &subscription_handle_; return subscription_handle_;
} }
const rcl_subscription_t * const std::shared_ptr<rcl_subscription_t>
SubscriptionBase::get_intra_process_subscription_handle() const SubscriptionBase::get_intra_process_subscription_handle() const
{ {
return &intra_process_subscription_handle_; return intra_process_subscription_handle_;
} }

View file

@ -16,16 +16,36 @@
#include <chrono> #include <chrono>
#include <string> #include <string>
#include <memory>
#include "rcutils/logging_macros.h"
using rclcpp::TimerBase; using rclcpp::TimerBase;
TimerBase::TimerBase(std::chrono::nanoseconds period) TimerBase::TimerBase(std::chrono::nanoseconds period)
{ {
timer_handle_ = std::shared_ptr<rcl_timer_t>(
new rcl_timer_t, [ = ](rcl_timer_t * timer)
{
if (rcl_timer_fini(timer) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Failed to clean up rcl timer handle: %s", rcl_get_error_string_safe());
rcl_reset_error();
}
delete timer;
});
*timer_handle_.get() = rcl_get_zero_initialized_timer();
if (rcl_timer_init( if (rcl_timer_init(
&timer_handle_, period.count(), nullptr, timer_handle_.get(), period.count(), nullptr,
rcl_get_default_allocator()) != RCL_RET_OK) rcl_get_default_allocator()) != RCL_RET_OK)
{ {
fprintf(stderr, "Couldn't initialize rcl timer handle: %s\n", rcl_get_error_string_safe()); RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Couldn't initialize rcl timer handle: %s\n", rcl_get_error_string_safe());
rcl_reset_error();
} }
} }
@ -35,7 +55,7 @@ TimerBase::~TimerBase()
void void
TimerBase::cancel() TimerBase::cancel()
{ {
if (rcl_timer_cancel(&timer_handle_) != RCL_RET_OK) { if (rcl_timer_cancel(timer_handle_.get()) != RCL_RET_OK) {
throw std::runtime_error(std::string("Couldn't cancel timer: ") + rcl_get_error_string_safe()); throw std::runtime_error(std::string("Couldn't cancel timer: ") + rcl_get_error_string_safe());
} }
} }
@ -43,7 +63,7 @@ TimerBase::cancel()
void void
TimerBase::reset() TimerBase::reset()
{ {
if (rcl_timer_reset(&timer_handle_) != RCL_RET_OK) { if (rcl_timer_reset(timer_handle_.get()) != RCL_RET_OK) {
throw std::runtime_error(std::string("Couldn't reset timer: ") + rcl_get_error_string_safe()); throw std::runtime_error(std::string("Couldn't reset timer: ") + rcl_get_error_string_safe());
} }
} }
@ -52,7 +72,7 @@ bool
TimerBase::is_ready() TimerBase::is_ready()
{ {
bool ready = false; bool ready = false;
if (rcl_timer_is_ready(&timer_handle_, &ready) != RCL_RET_OK) { if (rcl_timer_is_ready(timer_handle_.get(), &ready) != RCL_RET_OK) {
throw std::runtime_error(std::string("Failed to check timer: ") + rcl_get_error_string_safe()); throw std::runtime_error(std::string("Failed to check timer: ") + rcl_get_error_string_safe());
} }
return ready; return ready;
@ -62,7 +82,9 @@ std::chrono::nanoseconds
TimerBase::time_until_trigger() TimerBase::time_until_trigger()
{ {
int64_t time_until_next_call = 0; int64_t time_until_next_call = 0;
if (rcl_timer_get_time_until_next_call(&timer_handle_, &time_until_next_call) != RCL_RET_OK) { if (rcl_timer_get_time_until_next_call(timer_handle_.get(),
&time_until_next_call) != RCL_RET_OK)
{
throw std::runtime_error( throw std::runtime_error(
std::string("Timer could not get time until next call: ") + std::string("Timer could not get time until next call: ") +
rcl_get_error_string_safe()); rcl_get_error_string_safe());
@ -70,8 +92,8 @@ TimerBase::time_until_trigger()
return std::chrono::nanoseconds(time_until_next_call); return std::chrono::nanoseconds(time_until_next_call);
} }
const rcl_timer_t * std::shared_ptr<const rcl_timer_t>
TimerBase::get_timer_handle() TimerBase::get_timer_handle()
{ {
return &timer_handle_; return timer_handle_;
} }

View file

@ -106,6 +106,15 @@ TEST_F(TestExternallyDefinedServices, extern_defined_initialized) {
return; return;
} }
// Destruct the service
ret = rcl_service_fini(
&service_handle,
node_handle->get_node_base_interface()->get_rcl_node_handle());
if (ret != RCL_RET_OK) {
FAIL();
return;
}
SUCCEED(); SUCCEED();
} }
@ -139,5 +148,15 @@ TEST_F(TestExternallyDefinedServices, extern_defined_destructor) {
FAIL(); FAIL();
return; return;
} }
// Destruct the service
ret = rcl_service_fini(
&service_handle,
node_handle->get_node_base_interface()->get_rcl_node_handle());
if (ret != RCL_RET_OK) {
FAIL();
return;
}
SUCCEED(); SUCCEED();
} }

View file

@ -52,6 +52,9 @@ TEST_F(TestFindWeakNodes, allocator_strategy_with_weak_nodes) {
// THEN // THEN
// The result of finding dangling node pointers should be true // The result of finding dangling node pointers should be true
ASSERT_TRUE(has_invalid_weak_nodes); ASSERT_TRUE(has_invalid_weak_nodes);
// Prevent memory leak due to the order of destruction
memory_strategy->clear_handles();
} }
TEST_F(TestFindWeakNodes, allocator_strategy_no_weak_nodes) { TEST_F(TestFindWeakNodes, allocator_strategy_no_weak_nodes) {
@ -73,4 +76,7 @@ TEST_F(TestFindWeakNodes, allocator_strategy_no_weak_nodes) {
// THEN // THEN
// The result of finding dangling node pointers should be false // The result of finding dangling node pointers should be false
ASSERT_FALSE(has_invalid_weak_nodes); ASSERT_FALSE(has_invalid_weak_nodes);
// Prevent memory leak due to the order of destruction
memory_strategy->clear_handles();
} }

View file

@ -68,7 +68,7 @@ public:
void TearDown() void TearDown()
{ {
rcutils_logging_set_output_handler(this->previous_output_handler); rcutils_logging_set_output_handler(this->previous_output_handler);
g_rcutils_logging_initialized = false; ASSERT_EQ(RCUTILS_RET_OK, rcutils_logging_shutdown());
EXPECT_FALSE(g_rcutils_logging_initialized); EXPECT_FALSE(g_rcutils_logging_initialized);
} }
}; };