Lost service responses (#183, #74) (#187) (#209)

* Block rmw_send_response if response reader unknown

The client checks using rmw_service_server_is_available whether the
request it sends will be delivered to service, but that does not imply
that the (independent, as far as DDS is concerned) response reader of
the client has been discovered by the service.  Usually that will be the
case, but there is no guarantee.

Ideally DDS would offer an interface that allows checking the reverse
discovery, but that does not yet exist in either the specification or in
Cyclone.  This commit works around that by delaying publishing the
response until the number of request writers matches the number of
response readers.

Signed-off-by: Erik Boasson <eb@ilities.com>

* Change request headers to use rmw_request_id_t on the wire

Signed-off-by: Erik Boasson <eb@ilities.com>

* Precise check for matched client/service

Assign a unique identifier to each client/service on creation, add it
to the USER_DATA QoS of the reader and writer and use it for the request
ids.  This allows:

* rmw_service_server_is_available to only return true once it has
  discovered a reader/writer pair of a single service (rather than a
  reader from some service and a writer from some service); and
* rmw_send_response to block until it has discovered the requesting
  client's response reader and to abandon the operation when the client
  has disappeared.

The USER_DATA is formatted in the same manner as the participant
USER_DATA, this uses the keys "serviceid" and "clientid".

This is still but a workaround for having a mechanism in DDS to ensure
that the response reader has been discovered prior by the request writer
prior to sending the request.

Signed-off-by: Erik Boasson <eb@ilities.com>

* Address review comments

Signed-off-by: Erik Boasson <eb@ilities.com>

* Backwards compatibility

* Revert commit fb040c5db6c05be7698f05969f9bb48b8740f0fe to retain the
  old wire representation;

* Embed the publication_handle of the request inside rmw_request_id_t,
  possible because reverting to the old wire representation frees up
  enough space, and use this in rmw_send_response to check for the
  presence of the client's reader;

* Clients and services without a client/service id in the reader/writer
  user data are treated as fully matched at all times.

* Replace ERROR by FAILURE to because of windows.h

Signed-off-by: Erik Boasson <eb@ilities.com>

* Timeout rmw_send_response after waiting 100ms for discovery

The discovery will eventually result in the client's reader being known
or its writer no longer being known, so a timeout is not necessary for
correctness.  However, if it ever were to block for a longish
time (which is possible in the face of network failures), returning a
timeout to the caller is expected to result in less confusion.

Signed-off-by: Erik Boasson <eb@ilities.com>

* Make iterators "const auto &"

Signed-off-by: Erik Boasson <eb@ilities.com>

* Add TODO for eliminating rmw_send_response blocking

Signed-off-by: Erik Boasson <eb@ilities.com>

Co-authored-by: eboasson <eb@ilities.com>
This commit is contained in:
Jacob Perron 2020-07-21 16:19:35 -07:00 committed by GitHub
parent 368cac9420
commit 5333ab6be7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -16,6 +16,8 @@
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include <algorithm> #include <algorithm>
#include <chrono>
#include <iomanip>
#include <map> #include <map>
#include <set> #include <set>
#include <functional> #include <functional>
@ -75,6 +77,8 @@
#include "serdata.hpp" #include "serdata.hpp"
#include "demangle.hpp" #include "demangle.hpp"
using namespace std::literals::chrono_literals;
/* Security must be enabled when compiling and requires cyclone to support QOS property lists */ /* Security must be enabled when compiling and requires cyclone to support QOS property lists */
#if DDS_HAS_SECURITY && DDS_HAS_PROPERTY_LIST_QOS #if DDS_HAS_SECURITY && DDS_HAS_PROPERTY_LIST_QOS
#define RMW_SUPPORT_SECURITY 1 #define RMW_SUPPORT_SECURITY 1
@ -244,6 +248,7 @@ struct rmw_context_impl_t
rmw_dds_common::Context common; rmw_dds_common::Context common;
dds_domainid_t domain_id; dds_domainid_t domain_id;
dds_entity_t ppant; dds_entity_t ppant;
rmw_gid_t ppant_gid;
/* handles for built-in topic readers */ /* handles for built-in topic readers */
dds_entity_t rd_participant; dds_entity_t rd_participant;
@ -258,8 +263,12 @@ struct rmw_context_impl_t
size_t node_count{0}; size_t node_count{0};
std::mutex initialization_mutex; std::mutex initialization_mutex;
/* suffix for GUIDs to construct unique client/service ids
(protected by initialization_mutex) */
uint32_t client_service_id;
rmw_context_impl_t() rmw_context_impl_t()
: common(), domain_id(UINT32_MAX), ppant(0) : common(), domain_id(UINT32_MAX), ppant(0), client_service_id(0)
{ {
/* destructor relies on these being initialized properly */ /* destructor relies on these being initialized properly */
common.thread_is_running.store(false); common.thread_is_running.store(false);
@ -309,10 +318,18 @@ struct CddsSubscription : CddsEntity
dds_entity_t rdcondh; dds_entity_t rdcondh;
}; };
struct client_service_id_t
{
// strangely, the writer_guid in an rmw_request_id_t is smaller than the identifier in
// an rmw_gid_t
uint8_t data[sizeof((reinterpret_cast<rmw_request_id_t *>(0))->writer_guid)]; // NOLINT
};
struct CddsCS struct CddsCS
{ {
CddsPublisher * pub; CddsPublisher * pub;
CddsSubscription * sub; CddsSubscription * sub;
client_service_id_t id;
}; };
struct CddsClient struct CddsClient
@ -490,6 +507,32 @@ static void get_entity_gid(dds_entity_t h, rmw_gid_t & gid)
convert_guid_to_gid(guid, gid); convert_guid_to_gid(guid, gid);
} }
static std::map<std::string, std::vector<uint8_t>> parse_user_data(const dds_qos_t * qos)
{
std::map<std::string, std::vector<uint8_t>> map;
void * ud;
size_t udsz;
if (dds_qget_userdata(qos, &ud, &udsz)) {
std::vector<uint8_t> udvec(static_cast<uint8_t *>(ud), static_cast<uint8_t *>(ud) + udsz);
dds_free(ud);
map = rmw::impl::cpp::parse_key_value(udvec);
}
return map;
}
static bool get_user_data_key(const dds_qos_t * qos, const std::string key, std::string & value)
{
if (qos != nullptr) {
auto map = parse_user_data(qos);
auto name_found = map.find(key);
if (name_found != map.end()) {
value = std::string(name_found->second.begin(), name_found->second.end());
return true;
}
}
return false;
}
static void handle_ParticipantEntitiesInfo(dds_entity_t reader, void * arg) static void handle_ParticipantEntitiesInfo(dds_entity_t reader, void * arg)
{ {
static_cast<void>(reader); static_cast<void>(reader);
@ -516,18 +559,9 @@ static void handle_DCPSParticipant(dds_entity_t reader, void * arg)
} else if (si.instance_state != DDS_ALIVE_INSTANCE_STATE) { } else if (si.instance_state != DDS_ALIVE_INSTANCE_STATE) {
impl->common.graph_cache.remove_participant(gid); impl->common.graph_cache.remove_participant(gid);
} else if (si.valid_data) { } else if (si.valid_data) {
void * ud; std::string enclave;
size_t udsz; if (get_user_data_key(s->qos, "enclave", enclave)) {
if (dds_qget_userdata(s->qos, &ud, &udsz)) { impl->common.graph_cache.add_participant(gid, enclave);
std::vector<uint8_t> udvec(static_cast<uint8_t *>(ud), static_cast<uint8_t *>(ud) + udsz);
dds_free(ud);
auto map = rmw::impl::cpp::parse_key_value(udvec);
auto name_found = map.find("enclave");
if (name_found != map.end()) {
auto enclave =
std::string(name_found->second.begin(), name_found->second.end());
impl->common.graph_cache.add_participant(gid, enclave);
}
} }
} }
dds_return_loan(reader, &raw, 1); dds_return_loan(reader, &raw, 1);
@ -925,6 +959,7 @@ rmw_context_impl_t::init(rmw_init_options_t * options)
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS participant"); "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS participant");
return RMW_RET_ERROR; return RMW_RET_ERROR;
} }
get_entity_gid(this->ppant, this->ppant_gid);
/* Create readers for DDS built-in topics for monitoring discovery */ /* Create readers for DDS built-in topics for monitoring discovery */
if ((this->rd_participant = if ((this->rd_participant =
@ -3022,6 +3057,66 @@ extern "C" rmw_ret_t rmw_wait(
/////////// /////////// /////////// ///////////
///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////
using get_matched_endpoints_fn_t = dds_return_t (*)(
dds_entity_t h,
dds_instance_handle_t * xs, size_t nxs);
using BuiltinTopicEndpoint = std::unique_ptr<dds_builtintopic_endpoint_t,
std::function<void (dds_builtintopic_endpoint_t *)>>;
static rmw_ret_t get_matched_endpoints(
dds_entity_t h, get_matched_endpoints_fn_t fn, std::vector<dds_instance_handle_t> & res)
{
dds_return_t ret;
if ((ret = fn(h, res.data(), res.size())) < 0) {
return RMW_RET_ERROR;
}
while ((size_t) ret >= res.size()) {
// 128 is a completely arbitrary margin to reduce the risk of having to retry
// when matches are create/deleted in parallel
res.resize((size_t) ret + 128);
if ((ret = fn(h, res.data(), res.size())) < 0) {
return RMW_RET_ERROR;
}
}
res.resize((size_t) ret);
return RMW_RET_OK;
}
static void free_builtintopic_endpoint(dds_builtintopic_endpoint_t * e)
{
dds_delete_qos(e->qos);
dds_free(e->topic_name);
dds_free(e->type_name);
dds_free(e);
}
static BuiltinTopicEndpoint get_matched_subscription_data(
dds_entity_t writer, dds_instance_handle_t readerih)
{
BuiltinTopicEndpoint ep(dds_get_matched_subscription_data(writer, readerih),
free_builtintopic_endpoint);
return ep;
}
static BuiltinTopicEndpoint get_matched_publication_data(
dds_entity_t reader, dds_instance_handle_t writerih)
{
BuiltinTopicEndpoint ep(dds_get_matched_publication_data(reader, writerih),
free_builtintopic_endpoint);
return ep;
}
static const std::string csid_to_string(const client_service_id_t & id)
{
std::ostringstream os;
os << std::hex;
os << std::setw(2) << static_cast<int>(id.data[0]);
for (size_t i = 1; i < sizeof(id.data); i++) {
os << "." << static_cast<int>(id.data[i]);
}
return os.str();
}
static rmw_ret_t rmw_take_response_request( static rmw_ret_t rmw_take_response_request(
CddsCS * cs, rmw_service_info_t * request_header, CddsCS * cs, rmw_service_info_t * request_header,
void * ros_data, bool * taken, dds_time_t * source_timestamp, void * ros_data, bool * taken, dds_time_t * source_timestamp,
@ -3036,9 +3131,16 @@ static rmw_ret_t rmw_take_response_request(
void * wrap_ptr = static_cast<void *>(&wrap); void * wrap_ptr = static_cast<void *>(&wrap);
while (dds_take(cs->sub->enth, &wrap_ptr, &info, 1, 1) == 1) { while (dds_take(cs->sub->enth, &wrap_ptr, &info, 1, 1) == 1) {
if (info.valid_data) { if (info.valid_data) {
memset(request_header, 0, sizeof(wrap.header)); static_assert(
assert(sizeof(wrap.header.guid) <= sizeof(request_header->request_id.writer_guid)); sizeof(request_header->request_id.writer_guid) ==
memcpy(request_header->request_id.writer_guid, &wrap.header.guid, sizeof(wrap.header.guid)); sizeof(wrap.header.guid) + sizeof(info.publication_handle),
"request header size assumptions not met");
memcpy(
static_cast<void *>(request_header->request_id.writer_guid),
static_cast<const void *>(&wrap.header.guid), sizeof(wrap.header.guid));
memcpy(
static_cast<void *>(request_header->request_id.writer_guid + sizeof(wrap.header.guid)),
static_cast<const void *>(&info.publication_handle), sizeof(info.publication_handle));
request_header->request_id.sequence_number = wrap.header.seq; request_header->request_id.sequence_number = wrap.header.seq;
request_header->source_timestamp = info.source_timestamp; request_header->source_timestamp = info.source_timestamp;
// TODO(iluetkeb) replace with real received timestamp when available in cyclone // TODO(iluetkeb) replace with real received timestamp when available in cyclone
@ -3110,7 +3212,9 @@ extern "C" rmw_ret_t rmw_take_request(
{ {
RET_WRONG_IMPLID(service); RET_WRONG_IMPLID(service);
auto info = static_cast<CddsService *>(service->data); auto info = static_cast<CddsService *>(service->data);
return rmw_take_response_request(&info->service, request_header, ros_request, taken, nullptr, 0); return rmw_take_response_request(
&info->service, request_header, ros_request, taken, nullptr,
false);
} }
static rmw_ret_t rmw_send_response_request( static rmw_ret_t rmw_send_response_request(
@ -3126,6 +3230,56 @@ static rmw_ret_t rmw_send_response_request(
} }
} }
enum class client_present_t
{
FAILURE, // an error occurred when checking
MAYBE, // reader not matched, writer still present
YES, // reader matched
GONE // neither reader nor writer
};
static bool check_client_service_endpoint(
const dds_builtintopic_endpoint_t * ep,
const std::string key, const std::string needle)
{
if (ep != nullptr) {
std::string clientid;
get_user_data_key(ep->qos, key, clientid);
return clientid == needle;
}
return false;
}
static client_present_t check_for_response_reader(
const CddsCS & service,
const dds_instance_handle_t reqwrih)
{
auto reqwr = get_matched_publication_data(service.sub->enth, reqwrih);
std::string clientid;
if (reqwr == nullptr) {
return client_present_t::GONE;
} else if (!get_user_data_key(reqwr->qos, "clientid", clientid)) {
// backwards-compatibility: a client without a client id, assume all is well
return client_present_t::YES;
} else {
// look for this client's reader: if we have matched it, all is well;
// if not, continue waiting
std::vector<dds_instance_handle_t> rds;
if (get_matched_endpoints(service.pub->enth, dds_get_matched_subscriptions, rds) < 0) {
RMW_SET_ERROR_MSG("rmw_send_response: failed to get reader/writer matches");
return client_present_t::FAILURE;
}
// if we have matched this client's reader, all is well
for (const auto & rdih : rds) {
auto rd = get_matched_subscription_data(service.pub->enth, rdih);
if (check_client_service_endpoint(rd.get(), "clientid", clientid)) {
return client_present_t::YES;
}
}
return client_present_t::MAYBE;
}
}
extern "C" rmw_ret_t rmw_send_response( extern "C" rmw_ret_t rmw_send_response(
const rmw_service_t * service, const rmw_service_t * service,
rmw_request_id_t * request_header, void * ros_response) rmw_request_id_t * request_header, void * ros_response)
@ -3135,9 +3289,43 @@ extern "C" rmw_ret_t rmw_send_response(
RET_NULL(ros_response); RET_NULL(ros_response);
CddsService * info = static_cast<CddsService *>(service->data); CddsService * info = static_cast<CddsService *>(service->data);
cdds_request_header_t header; cdds_request_header_t header;
memcpy(&header.guid, request_header->writer_guid, sizeof(header.guid)); dds_instance_handle_t reqwrih;
static_assert(
sizeof(request_header->writer_guid) == sizeof(header.guid) + sizeof(reqwrih),
"request header size assumptions not met");
memcpy(
static_cast<void *>(&header.guid), static_cast<const void *>(request_header->writer_guid),
sizeof(header.guid));
memcpy(
static_cast<void *>(&reqwrih),
static_cast<const void *>(request_header->writer_guid + sizeof(header.guid)), sizeof(reqwrih));
header.seq = request_header->sequence_number; header.seq = request_header->sequence_number;
return rmw_send_response_request(&info->service, header, ros_response); // Block until the response reader has been matched by the response writer (this is a
// workaround: rmw_service_server_is_available should keep returning false until this
// is a given).
// TODO(eboasson): rmw_service_server_is_available should block the request instead (#191)
client_present_t st;
std::chrono::system_clock::time_point tnow = std::chrono::system_clock::now();
std::chrono::system_clock::time_point tend = tnow + 100ms;
while ((st =
check_for_response_reader(
info->service,
reqwrih)) == client_present_t::MAYBE && tnow < tend)
{
dds_sleepfor(DDS_MSECS(10));
tnow = std::chrono::system_clock::now();
}
switch (st) {
case client_present_t::FAILURE:
break;
case client_present_t::MAYBE:
return RMW_RET_TIMEOUT;
case client_present_t::YES:
return rmw_send_response_request(&info->service, header, ros_response);
case client_present_t::GONE:
return RMW_RET_OK;
}
return RMW_RET_ERROR;
} }
extern "C" rmw_ret_t rmw_send_request( extern "C" rmw_ret_t rmw_send_request(
@ -3185,6 +3373,31 @@ static const rosidl_service_type_support_t * get_service_typesupport(
} }
} }
static void get_unique_csid(const rmw_node_t * node, client_service_id_t & id)
{
auto impl = node->context->impl;
static_assert(
sizeof(dds_guid_t) <= sizeof(id.data),
"client/service id assumed it can hold a DDSI GUID");
static_assert(
sizeof(dds_guid_t) <= sizeof((reinterpret_cast<rmw_gid_t *>(0))->data),
"client/service id assumes rmw_gid_t can hold a DDSI GUID");
uint32_t x;
{
std::lock_guard<std::mutex> guard(impl->initialization_mutex);
x = ++impl->client_service_id;
}
// construct id by taking the entity prefix (which is just the first 12
// bytes of the GID, which itself is just the GUID padded with 0's; then
// overwriting the entity id with the big-endian counter value
memcpy(id.data, impl->ppant_gid.data, 12);
for (size_t i = 0, s = 24; i < 4; i++, s -= 8) {
id.data[12 + i] = static_cast<uint8_t>(x >> s);
}
}
static rmw_ret_t rmw_init_cs( static rmw_ret_t rmw_init_cs(
CddsCS * cs, const rmw_node_t * node, CddsCS * cs, const rmw_node_t * node,
const rosidl_service_type_support_t * type_supports, const rosidl_service_type_support_t * type_supports,
@ -3262,6 +3475,16 @@ static rmw_ret_t rmw_init_cs(
} }
dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1));
dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED);
// store a unique identifier for this client/service in the user
// data of the reader and writer so that we can always determine
// which pairs belong together
get_unique_csid(node, cs->id);
{
std::string user_data = std::string(is_service ? "serviceid=" : "clientid=") + csid_to_string(
cs->id) + std::string(";");
dds_qset_userdata(qos, user_data.c_str(), user_data.size());
}
if ((pub->enth = dds_create_writer(node->context->impl->dds_pub, pubtopic, qos, nullptr)) < 0) { if ((pub->enth = dds_create_writer(node->context->impl->dds_pub, pubtopic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create writer"); RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer; goto fail_writer;
@ -3599,6 +3822,49 @@ static rmw_ret_t get_topic_name(dds_entity_t endpoint_handle, std::string & name
} while (true); } while (true);
} }
static rmw_ret_t check_for_service_reader_writer(const CddsCS & client, bool * is_available)
{
std::vector<dds_instance_handle_t> rds, wrs;
assert(is_available != nullptr && !*is_available);
if (get_matched_endpoints(client.pub->enth, dds_get_matched_subscriptions, rds) < 0 ||
get_matched_endpoints(client.sub->enth, dds_get_matched_publications, wrs) < 0)
{
RMW_SET_ERROR_MSG("rmw_service_server_is_available: failed to get reader/writer matches");
return RMW_RET_ERROR;
}
// first extract all service ids from matched readers
std::set<std::string> needles;
for (const auto & rdih : rds) {
auto rd = get_matched_subscription_data(client.pub->enth, rdih);
std::string serviceid;
if (rd && get_user_data_key(rd->qos, "serviceid", serviceid)) {
needles.insert(serviceid);
}
}
if (needles.empty()) {
// if no services advertising a serviceid have been matched, but there
// are matched request readers and response writers, then we fall back
// to the old method of simply requiring the existence of matches.
*is_available = !rds.empty() && !wrs.empty();
} else {
// scan the writers to see if there is at least one response writer
// matching a discovered request reader
for (const auto & wrih : wrs) {
auto wr = get_matched_publication_data(client.sub->enth, wrih);
std::string serviceid;
if (wr &&
get_user_data_key(
wr->qos, "serviceid",
serviceid) && needles.find(serviceid) != needles.end())
{
*is_available = true;
break;
}
}
}
return RMW_RET_OK;
}
extern "C" rmw_ret_t rmw_service_server_is_available( extern "C" rmw_ret_t rmw_service_server_is_available(
const rmw_node_t * node, const rmw_node_t * node,
const rmw_client_t * client, const rmw_client_t * client,
@ -3632,16 +3898,7 @@ extern "C" rmw_ret_t rmw_service_server_is_available(
if (ret != RMW_RET_OK || 0 == number_of_response_publishers) { if (ret != RMW_RET_OK || 0 == number_of_response_publishers) {
return ret; return ret;
} }
dds_publication_matched_status_t ps; return check_for_service_reader_writer(info->client, is_available);
dds_subscription_matched_status_t cs;
if (dds_get_publication_matched_status(info->client.pub->enth, &ps) < 0 ||
dds_get_subscription_matched_status(info->client.sub->enth, &cs) < 0)
{
RMW_SET_ERROR_MSG("rmw_service_server_is_available: get_..._matched_status failed");
return RMW_RET_ERROR;
}
*is_available = ps.current_count > 0 && cs.current_count > 0;
return RMW_RET_OK;
} }
extern "C" rmw_ret_t rmw_count_publishers( extern "C" rmw_ret_t rmw_count_publishers(