Fix action server deadlock (#1285) (#1313) (#1518)

* Add missing locking to the rclcpp_action::ServerBase. (#1421)

This patch actually does 4 related things:

1.  Renames the recursive mutex in the ServerBaseImpl class
to action_server_reentrant_mutex_, which makes it a lot
clearer what it is meant to lock.
2.  Adds some additional error checking where checks were missed.
3.  Adds a lock to publish_status so that the action_server
structure is protected.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>

* [backport] Fix action server deadlock (#1285, #1313)

Signed-off-by: Daisuke Sato <daisukes@cmu.edu>

* revert comment

Signed-off-by: Daisuke Sato <daisukes@cmu.edu>

Co-authored-by: Chris Lalancette <clalancette@openrobotics.org>
This commit is contained in:
Daisuke Sato 2021-01-20 16:51:10 -05:00 committed by GitHub
parent 234b5e423b
commit 6756ccfb50
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -43,8 +43,8 @@ public:
{ {
} }
// Lock everything except user callbacks // Lock for action_server_
std::recursive_mutex reentrant_mutex_; std::recursive_mutex action_server_reentrant_mutex_;
std::shared_ptr<rcl_action_server_t> action_server_; std::shared_ptr<rcl_action_server_t> action_server_;
@ -56,10 +56,13 @@ public:
size_t num_services_ = 0; size_t num_services_ = 0;
size_t num_guard_conditions_ = 0; size_t num_guard_conditions_ = 0;
bool goal_request_ready_ = false; std::atomic<bool> goal_request_ready_{false};
bool cancel_request_ready_ = false; std::atomic<bool> cancel_request_ready_{false};
bool result_request_ready_ = false; std::atomic<bool> result_request_ready_{false};
bool goal_expired_ = false; std::atomic<bool> goal_expired_{false};
// Lock for unordered_maps
std::recursive_mutex unordered_map_mutex_;
// Results to be kept until the goal expires after reaching a terminal state // Results to be kept until the goal expires after reaching a terminal state
std::unordered_map<GoalUUID, std::shared_ptr<void>> goal_results_; std::unordered_map<GoalUUID, std::shared_ptr<void>> goal_results_;
@ -88,12 +91,13 @@ ServerBase::ServerBase(
if (nullptr != ptr) { if (nullptr != ptr) {
rcl_node_t * rcl_node = node_base->get_rcl_node_handle(); rcl_node_t * rcl_node = node_base->get_rcl_node_handle();
rcl_ret_t ret = rcl_action_server_fini(ptr, rcl_node); rcl_ret_t ret = rcl_action_server_fini(ptr, rcl_node);
(void)ret; if (RCL_RET_OK != ret) {
RCLCPP_DEBUG( RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"), rclcpp::get_logger("rclcpp_action"),
"failed to fini rcl_action_server_t in deleter"); "failed to fini rcl_action_server_t in deleter");
} }
delete ptr; delete ptr;
}
}; };
pimpl_->action_server_.reset(new rcl_action_server_t, deleter); pimpl_->action_server_.reset(new rcl_action_server_t, deleter);
@ -159,7 +163,7 @@ ServerBase::get_number_of_ready_guard_conditions()
bool bool
ServerBase::add_to_wait_set(rcl_wait_set_t * wait_set) ServerBase::add_to_wait_set(rcl_wait_set_t * wait_set)
{ {
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_); std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_wait_set_add_action_server( rcl_ret_t ret = rcl_action_wait_set_add_action_server(
wait_set, pimpl_->action_server_.get(), NULL); wait_set, pimpl_->action_server_.get(), NULL);
return RCL_RET_OK == ret; return RCL_RET_OK == ret;
@ -168,35 +172,47 @@ ServerBase::add_to_wait_set(rcl_wait_set_t * wait_set)
bool bool
ServerBase::is_ready(rcl_wait_set_t * wait_set) ServerBase::is_ready(rcl_wait_set_t * wait_set)
{ {
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_); bool goal_request_ready;
rcl_ret_t ret = rcl_action_server_wait_set_get_entities_ready( bool cancel_request_ready;
bool result_request_ready;
bool goal_expired;
rcl_ret_t ret;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_server_wait_set_get_entities_ready(
wait_set, wait_set,
pimpl_->action_server_.get(), pimpl_->action_server_.get(),
&pimpl_->goal_request_ready_, &goal_request_ready,
&pimpl_->cancel_request_ready_, &cancel_request_ready,
&pimpl_->result_request_ready_, &result_request_ready,
&pimpl_->goal_expired_); &goal_expired);
}
pimpl_->goal_request_ready_ = goal_request_ready;
pimpl_->cancel_request_ready_ = cancel_request_ready;
pimpl_->result_request_ready_ = result_request_ready;
pimpl_->goal_expired_ = goal_expired;
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret); rclcpp::exceptions::throw_from_rcl_error(ret);
} }
return pimpl_->goal_request_ready_ || return pimpl_->goal_request_ready_.load() ||
pimpl_->cancel_request_ready_ || pimpl_->cancel_request_ready_.load() ||
pimpl_->result_request_ready_ || pimpl_->result_request_ready_.load() ||
pimpl_->goal_expired_; pimpl_->goal_expired_.load();
} }
void void
ServerBase::execute() ServerBase::execute()
{ {
if (pimpl_->goal_request_ready_) { if (pimpl_->goal_request_ready_.load()) {
execute_goal_request_received(); execute_goal_request_received();
} else if (pimpl_->cancel_request_ready_) { } else if (pimpl_->cancel_request_ready_.load()) {
execute_cancel_request_received(); execute_cancel_request_received();
} else if (pimpl_->result_request_ready_) { } else if (pimpl_->result_request_ready_.load()) {
execute_result_request_received(); execute_result_request_received();
} else if (pimpl_->goal_expired_) { } else if (pimpl_->goal_expired_.load()) {
execute_check_expired_goals(); execute_check_expired_goals();
} else { } else {
throw std::runtime_error("Executing action server but nothing is ready"); throw std::runtime_error("Executing action server but nothing is ready");
@ -210,15 +226,19 @@ ServerBase::execute_goal_request_received()
rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info(); rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info();
rmw_request_id_t request_header; rmw_request_id_t request_header;
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
std::shared_ptr<void> message = create_goal_request(); std::shared_ptr<void> message = create_goal_request();
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_take_goal_request( ret = rcl_action_take_goal_request(
pimpl_->action_server_.get(), pimpl_->action_server_.get(),
&request_header, &request_header,
message.get()); message.get());
}
pimpl_->goal_request_ready_ = false; bool expected = true;
if (!pimpl_->goal_request_ready_.compare_exchange_strong(expected, false)) {
return;
}
if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data. // Ignore take failure because connext fails if it receives a sample without valid data.
@ -235,10 +255,13 @@ ServerBase::execute_goal_request_received()
// Call user's callback, getting the user's response and a ros message to send back // Call user's callback, getting the user's response and a ros message to send back
auto response_pair = call_handle_goal_callback(uuid, message); auto response_pair = call_handle_goal_callback(uuid, message);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_goal_response( ret = rcl_action_send_goal_response(
pimpl_->action_server_.get(), pimpl_->action_server_.get(),
&request_header, &request_header,
response_pair.second.get()); response_pair.second.get());
}
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret); rclcpp::exceptions::throw_from_rcl_error(ret);
@ -254,15 +277,19 @@ ServerBase::execute_goal_request_received()
{ {
if (nullptr != ptr) { if (nullptr != ptr) {
rcl_ret_t fail_ret = rcl_action_goal_handle_fini(ptr); rcl_ret_t fail_ret = rcl_action_goal_handle_fini(ptr);
(void)fail_ret; if (RCL_RET_OK != fail_ret) {
RCLCPP_DEBUG( RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"), rclcpp::get_logger("rclcpp_action"),
"failed to fini rcl_action_goal_handle_t in deleter"); "failed to fini rcl_action_goal_handle_t in deleter");
}
delete ptr; delete ptr;
} }
}; };
rcl_action_goal_handle_t * rcl_handle; rcl_action_goal_handle_t * rcl_handle;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info); rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info);
}
if (!rcl_handle) { if (!rcl_handle) {
throw std::runtime_error("Failed to accept new goal\n"); throw std::runtime_error("Failed to accept new goal\n");
} }
@ -271,7 +298,11 @@ ServerBase::execute_goal_request_received()
// Copy out goal handle since action server storage disappears when it is fini'd // Copy out goal handle since action server storage disappears when it is fini'd
*handle = *rcl_handle; *handle = *rcl_handle;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_handles_[uuid] = handle; pimpl_->goal_handles_[uuid] = handle;
}
if (GoalResponse::ACCEPT_AND_EXECUTE == status) { if (GoalResponse::ACCEPT_AND_EXECUTE == status) {
// Change status to executing // Change status to executing
@ -297,13 +328,18 @@ ServerBase::execute_cancel_request_received()
// Initialize cancel request // Initialize cancel request
auto request = std::make_shared<action_msgs::srv::CancelGoal::Request>(); auto request = std::make_shared<action_msgs::srv::CancelGoal::Request>();
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_); {
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_take_cancel_request( ret = rcl_action_take_cancel_request(
pimpl_->action_server_.get(), pimpl_->action_server_.get(),
&request_header, &request_header,
request.get()); request.get());
}
pimpl_->cancel_request_ready_ = false; bool expected = true;
if (!pimpl_->cancel_request_ready_.compare_exchange_strong(expected, false)) {
return;
}
if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data. // Ignore take failure because connext fails if it receives a sample without valid data.
@ -323,10 +359,14 @@ ServerBase::execute_cancel_request_received()
// Get a list of goal info that should be attempted to be cancelled // Get a list of goal info that should be attempted to be cancelled
rcl_action_cancel_response_t cancel_response = rcl_action_get_zero_initialized_cancel_response(); rcl_action_cancel_response_t cancel_response = rcl_action_get_zero_initialized_cancel_response();
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_process_cancel_request( ret = rcl_action_process_cancel_request(
pimpl_->action_server_.get(), pimpl_->action_server_.get(),
&cancel_request, &cancel_request,
&cancel_response); &cancel_response);
}
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret); rclcpp::exceptions::throw_from_rcl_error(ret);
} }
@ -369,8 +409,12 @@ ServerBase::execute_cancel_request_received()
publish_status(); publish_status();
} }
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_cancel_response( ret = rcl_action_send_cancel_response(
pimpl_->action_server_.get(), &request_header, response.get()); pimpl_->action_server_.get(), &request_header, response.get());
}
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret); rclcpp::exceptions::throw_from_rcl_error(ret);
} }
@ -383,11 +427,16 @@ ServerBase::execute_result_request_received()
// Get the result request message // Get the result request message
rmw_request_id_t request_header; rmw_request_id_t request_header;
std::shared_ptr<void> result_request = create_result_request(); std::shared_ptr<void> result_request = create_result_request();
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_); {
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_take_result_request( ret = rcl_action_take_result_request(
pimpl_->action_server_.get(), &request_header, result_request.get()); pimpl_->action_server_.get(), &request_header, result_request.get());
}
pimpl_->result_request_ready_ = false; bool expected = true;
if (!pimpl_->result_request_ready_.compare_exchange_strong(expected, false)) {
return;
}
if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data. // Ignore take failure because connext fails if it receives a sample without valid data.
@ -405,7 +454,10 @@ ServerBase::execute_result_request_received()
rcl_action_goal_info_t goal_info; rcl_action_goal_info_t goal_info;
convert(uuid, &goal_info); convert(uuid, &goal_info);
bool goal_exists; bool goal_exists;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info); goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
}
if (!goal_exists) { if (!goal_exists) {
// Goal does not exists // Goal does not exists
result_response = create_result_response(action_msgs::msg::GoalStatus::STATUS_UNKNOWN); result_response = create_result_response(action_msgs::msg::GoalStatus::STATUS_UNKNOWN);
@ -419,6 +471,7 @@ ServerBase::execute_result_request_received()
if (result_response) { if (result_response) {
// Send the result now // Send the result now
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_result_response( ret = rcl_action_send_result_response(
pimpl_->action_server_.get(), &request_header, result_response.get()); pimpl_->action_server_.get(), &request_header, result_response.get());
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
@ -426,6 +479,7 @@ ServerBase::execute_result_request_received()
} }
} else { } else {
// Store the request so it can be responded to later // Store the request so it can be responded to later
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->result_requests_[uuid].push_back(request_header); pimpl_->result_requests_[uuid].push_back(request_header);
} }
} }
@ -439,9 +493,11 @@ ServerBase::execute_check_expired_goals()
// Loop in case more than 1 goal expired // Loop in case more than 1 goal expired
while (num_expired > 0u) { while (num_expired > 0u) {
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
rcl_ret_t ret; rcl_ret_t ret;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_expire_goals(pimpl_->action_server_.get(), expired_goals, 1, &num_expired); ret = rcl_action_expire_goals(pimpl_->action_server_.get(), expired_goals, 1, &num_expired);
}
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret); rclcpp::exceptions::throw_from_rcl_error(ret);
} else if (num_expired) { } else if (num_expired) {
@ -449,6 +505,7 @@ ServerBase::execute_check_expired_goals()
GoalUUID uuid; GoalUUID uuid;
convert(expired_goals[0], &uuid); convert(expired_goals[0], &uuid);
RCLCPP_DEBUG(pimpl_->logger_, "Expired goal %s", to_string(uuid).c_str()); RCLCPP_DEBUG(pimpl_->logger_, "Expired goal %s", to_string(uuid).c_str());
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_results_.erase(uuid); pimpl_->goal_results_.erase(uuid);
pimpl_->result_requests_.erase(uuid); pimpl_->result_requests_.erase(uuid);
pimpl_->goal_handles_.erase(uuid); pimpl_->goal_handles_.erase(uuid);
@ -461,6 +518,11 @@ ServerBase::publish_status()
{ {
rcl_ret_t ret; rcl_ret_t ret;
// We need to hold the lock across this entire method because
// rcl_action_server_get_goal_handles() returns an internal pointer to the
// goal data.
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
// Get all goal handles known to C action server // Get all goal handles known to C action server
rcl_action_goal_handle_t ** goal_handles = NULL; rcl_action_goal_handle_t ** goal_handles = NULL;
size_t num_goals = 0; size_t num_goals = 0;
@ -516,20 +578,26 @@ ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr<void> result_m
// Check that the goal exists // Check that the goal exists
rcl_action_goal_info_t goal_info; rcl_action_goal_info_t goal_info;
convert(uuid, &goal_info); convert(uuid, &goal_info);
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
bool goal_exists; bool goal_exists;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info); goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
}
if (!goal_exists) { if (!goal_exists) {
throw std::runtime_error("Asked to publish result for goal that does not exist"); throw std::runtime_error("Asked to publish result for goal that does not exist");
} }
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_results_[uuid] = result_msg; pimpl_->goal_results_[uuid] = result_msg;
}
// if there are clients who already asked for the result, send it to them // if there are clients who already asked for the result, send it to them
auto iter = pimpl_->result_requests_.find(uuid); auto iter = pimpl_->result_requests_.find(uuid);
if (iter != pimpl_->result_requests_.end()) { if (iter != pimpl_->result_requests_.end()) {
for (auto & request_header : iter->second) { for (auto & request_header : iter->second) {
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_send_result_response( rcl_ret_t ret = rcl_action_send_result_response(
pimpl_->action_server_.get(), &request_header, result_msg.get()); pimpl_->action_server_.get(), &request_header, result_msg.get());
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
@ -542,7 +610,7 @@ ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr<void> result_m
void void
ServerBase::notify_goal_terminal_state() ServerBase::notify_goal_terminal_state()
{ {
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_); std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_notify_goal_done(pimpl_->action_server_.get()); rcl_ret_t ret = rcl_action_notify_goal_done(pimpl_->action_server_.get());
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret); rclcpp::exceptions::throw_from_rcl_error(ret);
@ -552,7 +620,7 @@ ServerBase::notify_goal_terminal_state()
void void
ServerBase::publish_feedback(std::shared_ptr<void> feedback_msg) ServerBase::publish_feedback(std::shared_ptr<void> feedback_msg)
{ {
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_); std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_publish_feedback(pimpl_->action_server_.get(), feedback_msg.get()); rcl_ret_t ret = rcl_action_publish_feedback(pimpl_->action_server_.get(), feedback_msg.get());
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to publish feedback"); rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to publish feedback");