Ensure compliant publisher API (#210)
Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>
This commit is contained in:
parent
3d03bf6c53
commit
398812c12d
1 changed files with 128 additions and 41 deletions
|
@ -1593,7 +1593,6 @@ static dds_qos_t * create_readwrite_qos(
|
||||||
dds_qset_writer_data_lifecycle(qos, false); /* disable autodispose */
|
dds_qset_writer_data_lifecycle(qos, false); /* disable autodispose */
|
||||||
switch (qos_policies->history) {
|
switch (qos_policies->history) {
|
||||||
case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT:
|
case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT:
|
||||||
case RMW_QOS_POLICY_HISTORY_UNKNOWN:
|
|
||||||
case RMW_QOS_POLICY_HISTORY_KEEP_LAST:
|
case RMW_QOS_POLICY_HISTORY_KEEP_LAST:
|
||||||
if (qos_policies->depth == RMW_QOS_POLICY_DEPTH_SYSTEM_DEFAULT) {
|
if (qos_policies->depth == RMW_QOS_POLICY_DEPTH_SYSTEM_DEFAULT) {
|
||||||
dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, 1);
|
dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, 1);
|
||||||
|
@ -1609,24 +1608,26 @@ static dds_qos_t * create_readwrite_qos(
|
||||||
case RMW_QOS_POLICY_HISTORY_KEEP_ALL:
|
case RMW_QOS_POLICY_HISTORY_KEEP_ALL:
|
||||||
dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED);
|
dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED);
|
||||||
break;
|
break;
|
||||||
|
case RMW_QOS_POLICY_HISTORY_UNKNOWN:
|
||||||
|
return nullptr;
|
||||||
default:
|
default:
|
||||||
rmw_cyclonedds_cpp::unreachable();
|
rmw_cyclonedds_cpp::unreachable();
|
||||||
}
|
}
|
||||||
switch (qos_policies->reliability) {
|
switch (qos_policies->reliability) {
|
||||||
case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT:
|
case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT:
|
||||||
case RMW_QOS_POLICY_RELIABILITY_UNKNOWN:
|
|
||||||
case RMW_QOS_POLICY_RELIABILITY_RELIABLE:
|
case RMW_QOS_POLICY_RELIABILITY_RELIABLE:
|
||||||
dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY);
|
dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY);
|
||||||
break;
|
break;
|
||||||
case RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT:
|
case RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT:
|
||||||
dds_qset_reliability(qos, DDS_RELIABILITY_BEST_EFFORT, 0);
|
dds_qset_reliability(qos, DDS_RELIABILITY_BEST_EFFORT, 0);
|
||||||
break;
|
break;
|
||||||
|
case RMW_QOS_POLICY_RELIABILITY_UNKNOWN:
|
||||||
|
return nullptr;
|
||||||
default:
|
default:
|
||||||
rmw_cyclonedds_cpp::unreachable();
|
rmw_cyclonedds_cpp::unreachable();
|
||||||
}
|
}
|
||||||
switch (qos_policies->durability) {
|
switch (qos_policies->durability) {
|
||||||
case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT:
|
case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT:
|
||||||
case RMW_QOS_POLICY_DURABILITY_UNKNOWN:
|
|
||||||
case RMW_QOS_POLICY_DURABILITY_VOLATILE:
|
case RMW_QOS_POLICY_DURABILITY_VOLATILE:
|
||||||
dds_qset_durability(qos, DDS_DURABILITY_VOLATILE);
|
dds_qset_durability(qos, DDS_DURABILITY_VOLATILE);
|
||||||
break;
|
break;
|
||||||
|
@ -1643,6 +1644,8 @@ static dds_qos_t * create_readwrite_qos(
|
||||||
DDS_LENGTH_UNLIMITED);
|
DDS_LENGTH_UNLIMITED);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case RMW_QOS_POLICY_DURABILITY_UNKNOWN:
|
||||||
|
return nullptr;
|
||||||
default:
|
default:
|
||||||
rmw_cyclonedds_cpp::unreachable();
|
rmw_cyclonedds_cpp::unreachable();
|
||||||
}
|
}
|
||||||
|
@ -1664,12 +1667,13 @@ static dds_qos_t * create_readwrite_qos(
|
||||||
switch (qos_policies->liveliness) {
|
switch (qos_policies->liveliness) {
|
||||||
case RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT:
|
case RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT:
|
||||||
case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC:
|
case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC:
|
||||||
case RMW_QOS_POLICY_LIVELINESS_UNKNOWN:
|
|
||||||
dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, ldur);
|
dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, ldur);
|
||||||
break;
|
break;
|
||||||
case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC:
|
case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC:
|
||||||
dds_qset_liveliness(qos, DDS_LIVELINESS_MANUAL_BY_TOPIC, ldur);
|
dds_qset_liveliness(qos, DDS_LIVELINESS_MANUAL_BY_TOPIC, ldur);
|
||||||
break;
|
break;
|
||||||
|
case RMW_QOS_POLICY_LIVELINESS_UNKNOWN:
|
||||||
|
return nullptr;
|
||||||
default:
|
default:
|
||||||
rmw_cyclonedds_cpp::unreachable();
|
rmw_cyclonedds_cpp::unreachable();
|
||||||
}
|
}
|
||||||
|
@ -1907,33 +1911,40 @@ static rmw_publisher_t * create_publisher(
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
CddsPublisher * pub;
|
CddsPublisher * pub;
|
||||||
rmw_publisher_t * rmw_publisher;
|
|
||||||
if ((pub =
|
if ((pub =
|
||||||
create_cdds_publisher(
|
create_cdds_publisher(
|
||||||
dds_ppant, dds_pub, type_supports, topic_name,
|
dds_ppant, dds_pub, type_supports, topic_name,
|
||||||
qos_policies)) == nullptr)
|
qos_policies)) == nullptr)
|
||||||
{
|
{
|
||||||
goto fail_common_init;
|
return nullptr;
|
||||||
}
|
}
|
||||||
rmw_publisher = rmw_publisher_allocate();
|
auto cleanup_cdds_publisher = rcpputils::make_scope_exit(
|
||||||
RET_ALLOC_X(rmw_publisher, goto fail_publisher);
|
[pub]() {
|
||||||
|
if (dds_delete(pub->enth) < 0) {
|
||||||
|
RCUTILS_LOG_ERROR_NAMED(
|
||||||
|
"rmw_cyclonedds_cpp", "failed to delete writer during error handling");
|
||||||
|
}
|
||||||
|
delete pub;
|
||||||
|
});
|
||||||
|
|
||||||
|
rmw_publisher_t * rmw_publisher = rmw_publisher_allocate();
|
||||||
|
RET_ALLOC_X(rmw_publisher, return nullptr);
|
||||||
|
auto cleanup_rmw_publisher = rcpputils::make_scope_exit(
|
||||||
|
[rmw_publisher]() {
|
||||||
|
rmw_free(const_cast<char *>(rmw_publisher->topic_name));
|
||||||
|
rmw_publisher_free(rmw_publisher);
|
||||||
|
});
|
||||||
rmw_publisher->implementation_identifier = eclipse_cyclonedds_identifier;
|
rmw_publisher->implementation_identifier = eclipse_cyclonedds_identifier;
|
||||||
rmw_publisher->data = pub;
|
rmw_publisher->data = pub;
|
||||||
rmw_publisher->topic_name = reinterpret_cast<char *>(rmw_allocate(strlen(topic_name) + 1));
|
rmw_publisher->topic_name = reinterpret_cast<char *>(rmw_allocate(strlen(topic_name) + 1));
|
||||||
RET_ALLOC_X(rmw_publisher->topic_name, goto fail_topic_name);
|
RET_ALLOC_X(rmw_publisher->topic_name, return nullptr);
|
||||||
memcpy(const_cast<char *>(rmw_publisher->topic_name), topic_name, strlen(topic_name) + 1);
|
memcpy(const_cast<char *>(rmw_publisher->topic_name), topic_name, strlen(topic_name) + 1);
|
||||||
rmw_publisher->options = *publisher_options;
|
rmw_publisher->options = *publisher_options;
|
||||||
rmw_publisher->can_loan_messages = false;
|
rmw_publisher->can_loan_messages = false;
|
||||||
|
|
||||||
|
cleanup_rmw_publisher.cancel();
|
||||||
|
cleanup_cdds_publisher.cancel();
|
||||||
return rmw_publisher;
|
return rmw_publisher;
|
||||||
fail_topic_name:
|
|
||||||
rmw_publisher_free(rmw_publisher);
|
|
||||||
fail_publisher:
|
|
||||||
if (dds_delete(pub->enth) < 0) {
|
|
||||||
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete writer during error handling");
|
|
||||||
}
|
|
||||||
delete pub;
|
|
||||||
fail_common_init:
|
|
||||||
return nullptr;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" rmw_publisher_t * rmw_create_publisher(
|
extern "C" rmw_publisher_t * rmw_create_publisher(
|
||||||
|
@ -1942,29 +1953,67 @@ extern "C" rmw_publisher_t * rmw_create_publisher(
|
||||||
const rmw_publisher_options_t * publisher_options
|
const rmw_publisher_options_t * publisher_options
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
RET_WRONG_IMPLID_X(node, return nullptr);
|
RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr);
|
||||||
|
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
|
||||||
|
node,
|
||||||
|
node->implementation_identifier,
|
||||||
|
eclipse_cyclonedds_identifier,
|
||||||
|
return nullptr);
|
||||||
|
RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr);
|
||||||
|
RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, nullptr);
|
||||||
|
if (0 == strlen(topic_name)) {
|
||||||
|
RMW_SET_ERROR_MSG("topic_name argument is an empty string");
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr);
|
||||||
|
if (!qos_policies->avoid_ros_namespace_conventions) {
|
||||||
|
int validation_result = RMW_TOPIC_VALID;
|
||||||
|
rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr);
|
||||||
|
if (RMW_RET_OK != ret) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
if (RMW_TOPIC_VALID != validation_result) {
|
||||||
|
const char * reason = rmw_full_topic_name_validation_result_string(validation_result);
|
||||||
|
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("invalid topic name: %s", reason);
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr);
|
||||||
|
|
||||||
rmw_publisher_t * pub = create_publisher(
|
rmw_publisher_t * pub = create_publisher(
|
||||||
node->context->impl->ppant, node->context->impl->dds_pub,
|
node->context->impl->ppant, node->context->impl->dds_pub,
|
||||||
type_supports, topic_name, qos_policies,
|
type_supports, topic_name, qos_policies,
|
||||||
publisher_options);
|
publisher_options);
|
||||||
if (pub != nullptr) {
|
if (pub == nullptr) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
auto cleanup_publisher = rcpputils::make_scope_exit(
|
||||||
|
[pub]() {
|
||||||
|
rmw_error_state_t error_state = *rmw_get_error_state();
|
||||||
|
rmw_reset_error();
|
||||||
|
if (RMW_RET_OK != destroy_publisher(pub)) {
|
||||||
|
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
|
||||||
|
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
|
||||||
|
rmw_reset_error();
|
||||||
|
}
|
||||||
|
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
|
||||||
|
});
|
||||||
|
|
||||||
// Update graph
|
// Update graph
|
||||||
auto common = &node->context->impl->common;
|
auto common = &node->context->impl->common;
|
||||||
const auto cddspub = static_cast<const CddsPublisher *>(pub->data);
|
const auto cddspub = static_cast<const CddsPublisher *>(pub->data);
|
||||||
|
{
|
||||||
std::lock_guard<std::mutex> guard(common->node_update_mutex);
|
std::lock_guard<std::mutex> guard(common->node_update_mutex);
|
||||||
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
|
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
|
||||||
common->graph_cache.associate_writer(cddspub->gid, common->gid, node->name, node->namespace_);
|
common->graph_cache.associate_writer(cddspub->gid, common->gid, node->name, node->namespace_);
|
||||||
if (RMW_RET_OK != rmw_publish(
|
if (RMW_RET_OK != rmw_publish(common->pub, static_cast<void *>(&msg), nullptr)) {
|
||||||
common->pub,
|
|
||||||
static_cast<void *>(&msg),
|
|
||||||
nullptr))
|
|
||||||
{
|
|
||||||
static_cast<void>(common->graph_cache.dissociate_writer(
|
static_cast<void>(common->graph_cache.dissociate_writer(
|
||||||
cddspub->gid, common->gid, node->name, node->namespace_));
|
cddspub->gid, common->gid, node->name, node->namespace_));
|
||||||
static_cast<void>(destroy_publisher(pub));
|
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cleanup_publisher.cancel();
|
||||||
return pub;
|
return pub;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2021,14 +2070,18 @@ rmw_ret_t rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher)
|
||||||
|
|
||||||
rmw_ret_t rmw_publisher_get_actual_qos(const rmw_publisher_t * publisher, rmw_qos_profile_t * qos)
|
rmw_ret_t rmw_publisher_get_actual_qos(const rmw_publisher_t * publisher, rmw_qos_profile_t * qos)
|
||||||
{
|
{
|
||||||
RET_NULL(qos);
|
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
|
||||||
RET_WRONG_IMPLID(publisher);
|
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
|
||||||
|
publisher,
|
||||||
|
publisher->implementation_identifier,
|
||||||
|
eclipse_cyclonedds_identifier,
|
||||||
|
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
|
||||||
|
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);
|
||||||
auto pub = static_cast<CddsPublisher *>(publisher->data);
|
auto pub = static_cast<CddsPublisher *>(publisher->data);
|
||||||
if (get_readwrite_qos(pub->enth, qos)) {
|
if (get_readwrite_qos(pub->enth, qos)) {
|
||||||
return RMW_RET_OK;
|
return RMW_RET_OK;
|
||||||
} else {
|
|
||||||
return RMW_RET_ERROR;
|
|
||||||
}
|
}
|
||||||
|
return RMW_RET_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" rmw_ret_t rmw_borrow_loaned_message(
|
extern "C" rmw_ret_t rmw_borrow_loaned_message(
|
||||||
|
@ -2056,23 +2109,37 @@ extern "C" rmw_ret_t rmw_return_loaned_message_from_publisher(
|
||||||
|
|
||||||
static rmw_ret_t destroy_publisher(rmw_publisher_t * publisher)
|
static rmw_ret_t destroy_publisher(rmw_publisher_t * publisher)
|
||||||
{
|
{
|
||||||
RET_WRONG_IMPLID(publisher);
|
rmw_ret_t ret = RMW_RET_OK;
|
||||||
auto pub = static_cast<CddsPublisher *>(publisher->data);
|
auto pub = static_cast<CddsPublisher *>(publisher->data);
|
||||||
if (pub != nullptr) {
|
if (pub != nullptr) {
|
||||||
if (dds_delete(pub->enth) < 0) {
|
if (dds_delete(pub->enth) < 0) {
|
||||||
RMW_SET_ERROR_MSG("failed to delete writer");
|
RMW_SET_ERROR_MSG("failed to delete writer");
|
||||||
|
ret = RMW_RET_ERROR;
|
||||||
}
|
}
|
||||||
delete pub;
|
delete pub;
|
||||||
}
|
}
|
||||||
rmw_free(const_cast<char *>(publisher->topic_name));
|
rmw_free(const_cast<char *>(publisher->topic_name));
|
||||||
publisher->topic_name = nullptr;
|
|
||||||
rmw_publisher_free(publisher);
|
rmw_publisher_free(publisher);
|
||||||
return RMW_RET_OK;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
|
extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
|
||||||
{
|
{
|
||||||
RET_WRONG_IMPLID(node);
|
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
|
||||||
|
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
|
||||||
|
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
|
||||||
|
node,
|
||||||
|
node->implementation_identifier,
|
||||||
|
eclipse_cyclonedds_identifier,
|
||||||
|
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
|
||||||
|
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
|
||||||
|
publisher,
|
||||||
|
publisher->implementation_identifier,
|
||||||
|
eclipse_cyclonedds_identifier,
|
||||||
|
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
|
||||||
|
|
||||||
|
rmw_ret_t ret = RMW_RET_OK;
|
||||||
|
rmw_error_state_t error_state;
|
||||||
{
|
{
|
||||||
auto common = &node->context->impl->common;
|
auto common = &node->context->impl->common;
|
||||||
const auto cddspub = static_cast<const CddsPublisher *>(publisher->data);
|
const auto cddspub = static_cast<const CddsPublisher *>(publisher->data);
|
||||||
|
@ -2081,12 +2148,32 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t *
|
||||||
common->graph_cache.dissociate_writer(
|
common->graph_cache.dissociate_writer(
|
||||||
cddspub->gid, common->gid, node->name,
|
cddspub->gid, common->gid, node->name,
|
||||||
node->namespace_);
|
node->namespace_);
|
||||||
if (RMW_RET_OK != rmw_publish(common->pub, static_cast<void *>(&msg), nullptr)) {
|
rmw_ret_t publish_ret =
|
||||||
RMW_SET_ERROR_MSG(
|
rmw_publish(common->pub, static_cast<void *>(&msg), nullptr);
|
||||||
"failed to publish ParticipantEntitiesInfo message after dissociating writer");
|
if (RMW_RET_OK != publish_ret) {
|
||||||
|
error_state = *rmw_get_error_state();
|
||||||
|
ret = publish_ret;
|
||||||
|
rmw_reset_error();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return destroy_publisher(publisher);
|
|
||||||
|
rmw_ret_t inner_ret = destroy_publisher(publisher);
|
||||||
|
if (RMW_RET_OK != inner_ret) {
|
||||||
|
if (RMW_RET_OK != ret) {
|
||||||
|
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
|
||||||
|
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "'\n");
|
||||||
|
} else {
|
||||||
|
error_state = *rmw_get_error_state();
|
||||||
|
ret = inner_ret;
|
||||||
|
}
|
||||||
|
rmw_reset_error();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (RMW_RET_OK != ret) {
|
||||||
|
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue