Ensure compliant subscription API. (#214)
Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>
This commit is contained in:
parent
398812c12d
commit
bc2bc9ef1b
1 changed files with 142 additions and 56 deletions
|
@ -2267,32 +2267,41 @@ static rmw_subscription_t * create_subscription(
|
||||||
dds_ppant, dds_sub, type_supports, topic_name, qos_policies,
|
dds_ppant, dds_sub, type_supports, topic_name, qos_policies,
|
||||||
subscription_options->ignore_local_publications)) == nullptr)
|
subscription_options->ignore_local_publications)) == nullptr)
|
||||||
{
|
{
|
||||||
goto fail_common_init;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
auto cleanup_subscription = rcpputils::make_scope_exit(
|
||||||
|
[sub]() {
|
||||||
|
if (dds_delete(sub->rdcondh) < 0) {
|
||||||
|
RMW_SAFE_FWRITE_TO_STDERR(
|
||||||
|
"failed to delete readcondition during '"
|
||||||
|
RCUTILS_STRINGIFY(__function__) "' cleanup\n");
|
||||||
|
}
|
||||||
|
if (dds_delete(sub->enth) < 0) {
|
||||||
|
RMW_SAFE_FWRITE_TO_STDERR(
|
||||||
|
"failed to delete reader during '"
|
||||||
|
RCUTILS_STRINGIFY(__function__) "' cleanup\n");
|
||||||
|
}
|
||||||
|
delete sub;
|
||||||
|
});
|
||||||
rmw_subscription = rmw_subscription_allocate();
|
rmw_subscription = rmw_subscription_allocate();
|
||||||
RET_ALLOC_X(rmw_subscription, goto fail_subscription);
|
RET_ALLOC_X(rmw_subscription, return nullptr);
|
||||||
|
auto cleanup_rmw_subscription = rcpputils::make_scope_exit(
|
||||||
|
[rmw_subscription]() {
|
||||||
|
rmw_free(const_cast<char *>(rmw_subscription->topic_name));
|
||||||
|
rmw_subscription_free(rmw_subscription);
|
||||||
|
});
|
||||||
rmw_subscription->implementation_identifier = eclipse_cyclonedds_identifier;
|
rmw_subscription->implementation_identifier = eclipse_cyclonedds_identifier;
|
||||||
rmw_subscription->data = sub;
|
rmw_subscription->data = sub;
|
||||||
rmw_subscription->topic_name =
|
rmw_subscription->topic_name =
|
||||||
reinterpret_cast<const char *>(rmw_allocate(strlen(topic_name) + 1));
|
static_cast<const char *>(rmw_allocate(strlen(topic_name) + 1));
|
||||||
RET_ALLOC_X(rmw_subscription->topic_name, goto fail_topic_name);
|
RET_ALLOC_X(rmw_subscription->topic_name, return nullptr);
|
||||||
memcpy(const_cast<char *>(rmw_subscription->topic_name), topic_name, strlen(topic_name) + 1);
|
memcpy(const_cast<char *>(rmw_subscription->topic_name), topic_name, strlen(topic_name) + 1);
|
||||||
rmw_subscription->options = *subscription_options;
|
rmw_subscription->options = *subscription_options;
|
||||||
rmw_subscription->can_loan_messages = false;
|
rmw_subscription->can_loan_messages = false;
|
||||||
|
|
||||||
|
cleanup_subscription.cancel();
|
||||||
|
cleanup_rmw_subscription.cancel();
|
||||||
return rmw_subscription;
|
return rmw_subscription;
|
||||||
fail_topic_name:
|
|
||||||
rmw_subscription_free(rmw_subscription);
|
|
||||||
fail_subscription:
|
|
||||||
if (dds_delete(sub->rdcondh) < 0) {
|
|
||||||
RCUTILS_LOG_ERROR_NAMED(
|
|
||||||
"rmw_cyclonedds_cpp", "failed to delete readcondition during error handling");
|
|
||||||
}
|
|
||||||
if (dds_delete(sub->enth) < 0) {
|
|
||||||
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling");
|
|
||||||
}
|
|
||||||
delete sub;
|
|
||||||
fail_common_init:
|
|
||||||
return nullptr;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" rmw_subscription_t * rmw_create_subscription(
|
extern "C" rmw_subscription_t * rmw_create_subscription(
|
||||||
|
@ -2300,12 +2309,52 @@ extern "C" rmw_subscription_t * rmw_create_subscription(
|
||||||
const char * topic_name, const rmw_qos_profile_t * qos_policies,
|
const char * topic_name, const rmw_qos_profile_t * qos_policies,
|
||||||
const rmw_subscription_options_t * subscription_options)
|
const rmw_subscription_options_t * subscription_options)
|
||||||
{
|
{
|
||||||
RET_WRONG_IMPLID_X(node, return nullptr);
|
RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr);
|
||||||
|
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
|
||||||
|
node,
|
||||||
|
node->implementation_identifier,
|
||||||
|
eclipse_cyclonedds_identifier,
|
||||||
|
return nullptr);
|
||||||
|
RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr);
|
||||||
|
RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, nullptr);
|
||||||
|
if (0 == strlen(topic_name)) {
|
||||||
|
RMW_SET_ERROR_MSG("topic_name argument is an empty string");
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr);
|
||||||
|
if (!qos_policies->avoid_ros_namespace_conventions) {
|
||||||
|
int validation_result = RMW_TOPIC_VALID;
|
||||||
|
rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr);
|
||||||
|
if (RMW_RET_OK != ret) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
if (RMW_TOPIC_VALID != validation_result) {
|
||||||
|
const char * reason = rmw_full_topic_name_validation_result_string(validation_result);
|
||||||
|
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("invalid topic_name argument: %s", reason);
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr);
|
||||||
|
|
||||||
rmw_subscription_t * sub = create_subscription(
|
rmw_subscription_t * sub = create_subscription(
|
||||||
node->context->impl->ppant, node->context->impl->dds_sub,
|
node->context->impl->ppant, node->context->impl->dds_sub,
|
||||||
type_supports, topic_name, qos_policies,
|
type_supports, topic_name, qos_policies,
|
||||||
subscription_options);
|
subscription_options);
|
||||||
if (sub != nullptr) {
|
if (sub == nullptr) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
auto cleanup_subscription = rcpputils::make_scope_exit(
|
||||||
|
[sub]() {
|
||||||
|
rmw_error_state_t error_state = *rmw_get_error_state();
|
||||||
|
rmw_reset_error();
|
||||||
|
if (RMW_RET_OK != destroy_subscription(sub)) {
|
||||||
|
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
|
||||||
|
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
|
||||||
|
rmw_reset_error();
|
||||||
|
}
|
||||||
|
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
|
||||||
|
});
|
||||||
|
|
||||||
// Update graph
|
// Update graph
|
||||||
auto common = &node->context->impl->common;
|
auto common = &node->context->impl->common;
|
||||||
const auto cddssub = static_cast<const CddsSubscription *>(sub->data);
|
const auto cddssub = static_cast<const CddsSubscription *>(sub->data);
|
||||||
|
@ -2319,10 +2368,10 @@ extern "C" rmw_subscription_t * rmw_create_subscription(
|
||||||
{
|
{
|
||||||
static_cast<void>(common->graph_cache.dissociate_reader(
|
static_cast<void>(common->graph_cache.dissociate_reader(
|
||||||
cddssub->gid, common->gid, node->name, node->namespace_));
|
cddssub->gid, common->gid, node->name, node->namespace_));
|
||||||
static_cast<void>(destroy_subscription(sub));
|
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
cleanup_subscription.cancel();
|
||||||
return sub;
|
return sub;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2344,39 +2393,62 @@ extern "C" rmw_ret_t rmw_subscription_get_actual_qos(
|
||||||
const rmw_subscription_t * subscription,
|
const rmw_subscription_t * subscription,
|
||||||
rmw_qos_profile_t * qos)
|
rmw_qos_profile_t * qos)
|
||||||
{
|
{
|
||||||
RET_NULL(qos);
|
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
|
||||||
RET_WRONG_IMPLID(subscription);
|
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
|
||||||
|
subscription,
|
||||||
|
subscription->implementation_identifier,
|
||||||
|
eclipse_cyclonedds_identifier,
|
||||||
|
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
|
||||||
|
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);
|
||||||
|
|
||||||
auto sub = static_cast<CddsSubscription *>(subscription->data);
|
auto sub = static_cast<CddsSubscription *>(subscription->data);
|
||||||
if (get_readwrite_qos(sub->enth, qos)) {
|
if (get_readwrite_qos(sub->enth, qos)) {
|
||||||
return RMW_RET_OK;
|
return RMW_RET_OK;
|
||||||
} else {
|
|
||||||
return RMW_RET_ERROR;
|
|
||||||
}
|
}
|
||||||
|
return RMW_RET_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
static rmw_ret_t destroy_subscription(rmw_subscription_t * subscription)
|
static rmw_ret_t destroy_subscription(rmw_subscription_t * subscription)
|
||||||
{
|
{
|
||||||
RET_WRONG_IMPLID(subscription);
|
rmw_ret_t ret = RMW_RET_OK;
|
||||||
auto sub = static_cast<CddsSubscription *>(subscription->data);
|
auto sub = static_cast<CddsSubscription *>(subscription->data);
|
||||||
if (sub != nullptr) {
|
|
||||||
clean_waitset_caches();
|
clean_waitset_caches();
|
||||||
if (dds_delete(sub->rdcondh) < 0) {
|
if (dds_delete(sub->rdcondh) < 0) {
|
||||||
RMW_SET_ERROR_MSG("failed to delete readcondition");
|
RMW_SET_ERROR_MSG("failed to delete readcondition");
|
||||||
|
ret = RMW_RET_ERROR;
|
||||||
}
|
}
|
||||||
if (dds_delete(sub->enth) < 0) {
|
if (dds_delete(sub->enth) < 0) {
|
||||||
|
if (RMW_RET_OK == ret) {
|
||||||
RMW_SET_ERROR_MSG("failed to delete reader");
|
RMW_SET_ERROR_MSG("failed to delete reader");
|
||||||
|
ret = RMW_RET_ERROR;
|
||||||
|
} else {
|
||||||
|
RMW_SAFE_FWRITE_TO_STDERR("failed to delete reader\n");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
delete sub;
|
delete sub;
|
||||||
}
|
|
||||||
rmw_free(const_cast<char *>(subscription->topic_name));
|
rmw_free(const_cast<char *>(subscription->topic_name));
|
||||||
subscription->topic_name = nullptr;
|
|
||||||
rmw_subscription_free(subscription);
|
rmw_subscription_free(subscription);
|
||||||
return RMW_RET_OK;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
|
extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
|
||||||
{
|
{
|
||||||
RET_WRONG_IMPLID(node);
|
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
|
||||||
|
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
|
||||||
|
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
|
||||||
|
node,
|
||||||
|
node->implementation_identifier,
|
||||||
|
eclipse_cyclonedds_identifier,
|
||||||
|
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
|
||||||
|
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
|
||||||
|
subscription,
|
||||||
|
subscription->implementation_identifier,
|
||||||
|
eclipse_cyclonedds_identifier,
|
||||||
|
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
|
||||||
|
|
||||||
|
rmw_ret_t ret = RMW_RET_OK;
|
||||||
|
rmw_error_state_t error_state;
|
||||||
|
rmw_error_string_t error_string;
|
||||||
{
|
{
|
||||||
auto common = &node->context->impl->common;
|
auto common = &node->context->impl->common;
|
||||||
const auto cddssub = static_cast<const CddsSubscription *>(subscription->data);
|
const auto cddssub = static_cast<const CddsSubscription *>(subscription->data);
|
||||||
|
@ -2385,12 +2457,26 @@ extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscriptio
|
||||||
common->graph_cache.dissociate_writer(
|
common->graph_cache.dissociate_writer(
|
||||||
cddssub->gid, common->gid, node->name,
|
cddssub->gid, common->gid, node->name,
|
||||||
node->namespace_);
|
node->namespace_);
|
||||||
if (RMW_RET_OK != rmw_publish(common->pub, static_cast<void *>(&msg), nullptr)) {
|
ret = rmw_publish(common->pub, static_cast<void *>(&msg), nullptr);
|
||||||
RMW_SET_ERROR_MSG(
|
if (RMW_RET_OK != ret) {
|
||||||
"failed to publish ParticipantEntitiesInfo message after dissociating reader");
|
error_state = *rmw_get_error_state();
|
||||||
|
error_string = rmw_get_error_string();
|
||||||
|
rmw_reset_error();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return destroy_subscription(subscription);
|
|
||||||
|
rmw_ret_t local_ret = destroy_subscription(subscription);
|
||||||
|
if (RMW_RET_OK != local_ret) {
|
||||||
|
if (RMW_RET_OK != ret) {
|
||||||
|
RMW_SAFE_FWRITE_TO_STDERR(error_string.str);
|
||||||
|
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "'\n");
|
||||||
|
}
|
||||||
|
ret = local_ret;
|
||||||
|
} else if (RMW_RET_OK != ret) {
|
||||||
|
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static rmw_ret_t rmw_take_int(
|
static rmw_ret_t rmw_take_int(
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue