Add limited support for QoS changes
This commit adds support for changing all mutable QoS except those that affect reader/writer matching (i.e., deadline, latency budget and partition). This is simply because the recalculation of the matches hasn't been implemented yet, it is not a fundamental limitation. Implementing this basically forced fixing up a bunch of inconsistencies in handling QoS in entity creation. A silly multi-process ping-pong test built on changing the value of user data has been added. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
11a1b9d6f9
commit
a4d8aba4f9
32 changed files with 1225 additions and 361 deletions
|
@ -301,6 +301,7 @@ struct proxy_participant
|
|||
struct addrset *as_meta; /* default address set to use for discovery traffic */
|
||||
struct proxy_endpoint_common *endpoints; /* all proxy endpoints can be reached from here */
|
||||
ddsrt_avl_tree_t groups; /* table of all groups (publisher, subscriber), see struct proxy_group */
|
||||
seqno_t seq; /* sequence number of most recent SPDP message */
|
||||
unsigned kernel_sequence_numbers : 1; /* whether this proxy participant generates OSPL kernel sequence numbers */
|
||||
unsigned implicitly_created : 1; /* participants are implicitly created for Cloud/Fog discovered endpoints */
|
||||
unsigned is_ddsi2_pp: 1; /* if this is the federation-leader on the remote node */
|
||||
|
@ -527,6 +528,7 @@ dds_return_t new_participant (struct nn_guid *ppguid, unsigned flags, const stru
|
|||
* ppguid lookup failed.
|
||||
*/
|
||||
dds_return_t delete_participant (const struct nn_guid *ppguid);
|
||||
void update_participant_plist (struct participant *pp, const struct nn_plist *plist);
|
||||
|
||||
/* To obtain the builtin writer to be used for publishing SPDP, SEDP,
|
||||
PMD stuff for PP and its endpoints, given the entityid. If PP has
|
||||
|
@ -541,6 +543,9 @@ dds_return_t new_writer (struct writer **wr_out, struct nn_guid *wrguid, const s
|
|||
|
||||
dds_return_t new_reader (struct reader **rd_out, struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct dds_qos *xqos, struct rhc * rhc, status_cb_t status_cb, void *status_cb_arg);
|
||||
|
||||
void update_reader_qos (struct reader *rd, const struct dds_qos *xqos);
|
||||
void update_writer_qos (struct writer *wr, const struct dds_qos *xqos);
|
||||
|
||||
struct whc_node;
|
||||
struct whc_state;
|
||||
unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, struct whc_node **deferred_free_list);
|
||||
|
@ -586,7 +591,7 @@ void delete_local_orphan_writer (struct local_orphan_writer *wr);
|
|||
/* Set when this proxy participant is not to be announced on the built-in topics yet */
|
||||
#define CF_PROXYPP_NO_SPDP (1 << 3)
|
||||
|
||||
void new_proxy_participant (const struct nn_guid *guid, unsigned bes, unsigned prismtech_bes, const struct nn_guid *privileged_pp_guid, struct addrset *as_default, struct addrset *as_meta, const struct nn_plist *plist, dds_duration_t tlease_dur, nn_vendorid_t vendor, unsigned custom_flags, nn_wctime_t timestamp);
|
||||
void new_proxy_participant (const struct nn_guid *guid, unsigned bes, unsigned prismtech_bes, const struct nn_guid *privileged_pp_guid, struct addrset *as_default, struct addrset *as_meta, const struct nn_plist *plist, dds_duration_t tlease_dur, nn_vendorid_t vendor, unsigned custom_flags, nn_wctime_t timestamp, seqno_t seq);
|
||||
int delete_proxy_participant_by_guid (const struct nn_guid * guid, nn_wctime_t timestamp, int isimplicit);
|
||||
uint64_t participant_instance_id (const struct nn_guid *guid);
|
||||
|
||||
|
@ -595,8 +600,8 @@ enum update_proxy_participant_source {
|
|||
UPD_PROXYPP_CM
|
||||
};
|
||||
|
||||
int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp);
|
||||
int update_proxy_participant_plist (struct proxy_participant *proxypp, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp);
|
||||
int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, seqno_t seq, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp);
|
||||
int update_proxy_participant_plist (struct proxy_participant *proxypp, seqno_t seq, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp);
|
||||
void proxy_participant_reassign_lease (struct proxy_participant *proxypp, struct lease *newlease);
|
||||
|
||||
void purge_proxy_participants (const nn_locator_t *loc, bool delete_from_as_disc);
|
||||
|
@ -618,8 +623,8 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid,
|
|||
int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit);
|
||||
int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit);
|
||||
|
||||
void update_proxy_reader (struct proxy_reader * prd, struct addrset *as);
|
||||
void update_proxy_writer (struct proxy_writer * pwr, struct addrset *as);
|
||||
void update_proxy_reader (struct proxy_reader *prd, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
|
||||
void update_proxy_writer (struct proxy_writer *pwr, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
|
||||
|
||||
int new_proxy_group (const struct nn_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp);
|
||||
void delete_proxy_group (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit);
|
||||
|
|
|
@ -277,6 +277,7 @@ DDS_EXPORT void nn_xqos_init_default_topic (dds_qos_t *xqos);
|
|||
DDS_EXPORT void nn_xqos_copy (dds_qos_t *dst, const dds_qos_t *src);
|
||||
DDS_EXPORT void nn_xqos_unalias (dds_qos_t *xqos);
|
||||
DDS_EXPORT void nn_xqos_fini (dds_qos_t *xqos);
|
||||
DDS_EXPORT void nn_xqos_fini_mask (dds_qos_t *xqos, uint64_t mask);
|
||||
DDS_EXPORT dds_return_t nn_xqos_valid (const dds_qos_t *xqos);
|
||||
DDS_EXPORT void nn_xqos_mergein_missing (dds_qos_t *a, const dds_qos_t *b, uint64_t mask);
|
||||
DDS_EXPORT uint64_t nn_xqos_delta (const dds_qos_t *a, const dds_qos_t *b, uint64_t mask);
|
||||
|
|
|
@ -506,7 +506,7 @@ static void make_participants_dependent_on_ddsi2 (const nn_guid_t *ddsi2guid, nn
|
|||
}
|
||||
}
|
||||
|
||||
static int handle_SPDP_alive (const struct receiver_state *rst, nn_wctime_t timestamp, const nn_plist_t *datap)
|
||||
static int handle_SPDP_alive (const struct receiver_state *rst, seqno_t seq, nn_wctime_t timestamp, const nn_plist_t *datap)
|
||||
{
|
||||
const unsigned bes_sedp_announcer_mask =
|
||||
NN_DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER |
|
||||
|
@ -593,11 +593,14 @@ static int handle_SPDP_alive (const struct receiver_state *rst, nn_wctime_t time
|
|||
DDS_LOG(DDS_LC_TRACE, "SPDP ST0 "PGUIDFMT" (known)", PGUID (datap->participant_guid));
|
||||
lease_renew (ddsrt_atomic_ldvoidp (&proxypp->lease), now_et ());
|
||||
ddsrt_mutex_lock (&proxypp->e.lock);
|
||||
if (proxypp->implicitly_created)
|
||||
if (proxypp->implicitly_created || seq > proxypp->seq)
|
||||
{
|
||||
DDS_LOG(DDS_LC_DISCOVERY, " (NEW was-implicitly-created)");
|
||||
if (proxypp->implicitly_created)
|
||||
DDS_LOG(DDS_LC_DISCOVERY, " (NEW was-implicitly-created)");
|
||||
else
|
||||
DDS_LOG(DDS_LC_DISCOVERY, " (update)");
|
||||
proxypp->implicitly_created = 0;
|
||||
update_proxy_participant_plist_locked (proxypp, datap, UPD_PROXYPP_SPDP, timestamp);
|
||||
update_proxy_participant_plist_locked (proxypp, seq, datap, UPD_PROXYPP_SPDP, timestamp);
|
||||
}
|
||||
ddsrt_mutex_unlock (&proxypp->e.lock);
|
||||
return 0;
|
||||
|
@ -739,7 +742,8 @@ static int handle_SPDP_alive (const struct receiver_state *rst, nn_wctime_t time
|
|||
lease_duration,
|
||||
rst->vendor,
|
||||
custom_flags,
|
||||
timestamp
|
||||
timestamp,
|
||||
seq
|
||||
);
|
||||
|
||||
/* Force transmission of SPDP messages - we're not very careful
|
||||
|
@ -779,7 +783,7 @@ static int handle_SPDP_alive (const struct receiver_state *rst, nn_wctime_t time
|
|||
return 1;
|
||||
}
|
||||
|
||||
static void handle_SPDP (const struct receiver_state *rst, nn_wctime_t timestamp, unsigned statusinfo, const void *vdata, uint32_t len)
|
||||
static void handle_SPDP (const struct receiver_state *rst, seqno_t seq, nn_wctime_t timestamp, unsigned statusinfo, const void *vdata, uint32_t len)
|
||||
{
|
||||
const struct CDRHeader *data = vdata; /* built-ins not deserialized (yet) */
|
||||
DDS_TRACE("SPDP ST%x", statusinfo);
|
||||
|
@ -809,7 +813,7 @@ static void handle_SPDP (const struct receiver_state *rst, nn_wctime_t timestamp
|
|||
switch (statusinfo & (NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER))
|
||||
{
|
||||
case 0:
|
||||
interesting = handle_SPDP_alive (rst, timestamp, &decoded_data);
|
||||
interesting = handle_SPDP_alive (rst, seq, timestamp, &decoded_data);
|
||||
break;
|
||||
|
||||
case NN_STATUSINFO_DISPOSE:
|
||||
|
@ -1019,7 +1023,7 @@ static const char *durability_to_string (dds_durability_kind_t k)
|
|||
return "undefined-durability";
|
||||
}
|
||||
|
||||
static struct proxy_participant *implicitly_create_proxypp (const nn_guid_t *ppguid, nn_plist_t *datap /* note: potentially modifies datap */, const nn_guid_prefix_t *src_guid_prefix, nn_vendorid_t vendorid, nn_wctime_t timestamp)
|
||||
static struct proxy_participant *implicitly_create_proxypp (const nn_guid_t *ppguid, nn_plist_t *datap /* note: potentially modifies datap */, const nn_guid_prefix_t *src_guid_prefix, nn_vendorid_t vendorid, nn_wctime_t timestamp, seqno_t seq)
|
||||
{
|
||||
nn_guid_t privguid;
|
||||
nn_plist_t pp_plist;
|
||||
|
@ -1056,7 +1060,7 @@ static struct proxy_participant *implicitly_create_proxypp (const nn_guid_t *ppg
|
|||
doing anything about (1). That means we fall back to the legacy mode of locally generating
|
||||
GIDs but leaving the system id unchanged if the remote is OSPL. */
|
||||
actual_vendorid = (datap->present & PP_VENDORID) ? datap->vendorid : vendorid;
|
||||
new_proxy_participant(ppguid, 0, 0, &privguid, new_addrset(), new_addrset(), &pp_plist, T_NEVER, actual_vendorid, CF_IMPLICITLY_CREATED_PROXYPP, timestamp);
|
||||
new_proxy_participant(ppguid, 0, 0, &privguid, new_addrset(), new_addrset(), &pp_plist, T_NEVER, actual_vendorid, CF_IMPLICITLY_CREATED_PROXYPP, timestamp, seq);
|
||||
}
|
||||
else if (ppguid->prefix.u[0] == src_guid_prefix->u[0] && vendor_is_eclipse_or_opensplice (vendorid))
|
||||
{
|
||||
|
@ -1090,7 +1094,7 @@ static struct proxy_participant *implicitly_create_proxypp (const nn_guid_t *ppg
|
|||
ddsrt_mutex_unlock (&privpp->e.lock);
|
||||
|
||||
pp_plist.prismtech_participant_version_info.flags &= ~NN_PRISMTECH_FL_PARTICIPANT_IS_DDSI2;
|
||||
new_proxy_participant (ppguid, 0, 0, &privguid, as_default, as_meta, &pp_plist, T_NEVER, vendorid, CF_IMPLICITLY_CREATED_PROXYPP | CF_PROXYPP_NO_SPDP, timestamp);
|
||||
new_proxy_participant (ppguid, 0, 0, &privguid, as_default, as_meta, &pp_plist, T_NEVER, vendorid, CF_IMPLICITLY_CREATED_PROXYPP | CF_PROXYPP_NO_SPDP, timestamp, seq);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1138,7 +1142,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat
|
|||
if ((pp = ephash_lookup_proxy_participant_guid (&ppguid)) == NULL)
|
||||
{
|
||||
DDS_LOG(DDS_LC_DISCOVERY, " unknown-proxypp");
|
||||
if ((pp = implicitly_create_proxypp (&ppguid, datap, src_guid_prefix, vendorid, timestamp)) == NULL)
|
||||
if ((pp = implicitly_create_proxypp (&ppguid, datap, src_guid_prefix, vendorid, timestamp, 0)) == NULL)
|
||||
E ("?\n", err);
|
||||
/* Repeat regular SEDP trace for convenience */
|
||||
DDS_LOG(DDS_LC_DISCOVERY, "SEDP ST0 "PGUIDFMT" (cont)", PGUID (datap->endpoint_guid));
|
||||
|
@ -1185,18 +1189,10 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat
|
|||
}
|
||||
if (pwr || prd)
|
||||
{
|
||||
/* Cloud load balances by updating participant endpoints */
|
||||
|
||||
if (! vendor_is_cloud (vendorid))
|
||||
{
|
||||
DDS_LOG(DDS_LC_DISCOVERY, " known\n");
|
||||
goto err;
|
||||
}
|
||||
|
||||
/* Re-bind the proxy participant to the discovery service - and do this if it is currently
|
||||
bound to another DS instance, because that other DS instance may have already failed and
|
||||
with a new one taking over, without our noticing it. */
|
||||
DDS_LOG(DDS_LC_DISCOVERY, " known-DS");
|
||||
DDS_LOG(DDS_LC_DISCOVERY, " known%s", vendor_is_cloud (vendorid) ? "-DS" : "");
|
||||
if (vendor_is_cloud (vendorid) && pp->implicitly_created && memcmp(&pp->privileged_pp_guid.prefix, src_guid_prefix, sizeof(pp->privileged_pp_guid.prefix)) != 0)
|
||||
{
|
||||
nn_etime_t never = { T_NEVER };
|
||||
|
@ -1261,7 +1257,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat
|
|||
{
|
||||
if (pwr)
|
||||
{
|
||||
update_proxy_writer (pwr, as);
|
||||
update_proxy_writer (pwr, as, xqos, timestamp);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -1281,7 +1277,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat
|
|||
{
|
||||
if (prd)
|
||||
{
|
||||
update_proxy_reader (prd, as);
|
||||
update_proxy_reader (prd, as, xqos, timestamp);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -1484,9 +1480,9 @@ static void handle_SEDP_CM (const struct receiver_state *rst, nn_entityid_t wr_e
|
|||
else
|
||||
{
|
||||
if ((proxypp = ephash_lookup_proxy_participant_guid (&decoded_data.participant_guid)) == NULL)
|
||||
proxypp = implicitly_create_proxypp (&decoded_data.participant_guid, &decoded_data, &rst->src_guid_prefix, rst->vendor, timestamp);
|
||||
proxypp = implicitly_create_proxypp (&decoded_data.participant_guid, &decoded_data, &rst->src_guid_prefix, rst->vendor, timestamp, 0);
|
||||
if (proxypp != NULL)
|
||||
update_proxy_participant_plist (proxypp, &decoded_data, UPD_PROXYPP_CM, timestamp);
|
||||
update_proxy_participant_plist (proxypp, 0, &decoded_data, UPD_PROXYPP_CM, timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1864,7 +1860,7 @@ int builtins_dqueue_handler (const struct nn_rsample_info *sampleinfo, const str
|
|||
switch (srcguid.entityid.u)
|
||||
{
|
||||
case NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER:
|
||||
handle_SPDP (sampleinfo->rst, timestamp, statusinfo, datap, datasz);
|
||||
handle_SPDP (sampleinfo->rst, sampleinfo->seq, timestamp, statusinfo, datap, datasz);
|
||||
break;
|
||||
case NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER:
|
||||
case NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER:
|
||||
|
|
|
@ -375,6 +375,36 @@ static void remove_deleted_participant_guid (const struct nn_guid *guid, unsigne
|
|||
|
||||
/* PARTICIPANT ------------------------------------------------------ */
|
||||
|
||||
static bool update_qos_locked (struct entity_common *e, dds_qos_t *ent_qos, const dds_qos_t *xqos, nn_wctime_t timestamp)
|
||||
{
|
||||
uint64_t mask;
|
||||
|
||||
mask = nn_xqos_delta (ent_qos, xqos, QP_CHANGEABLE_MASK & ~(QP_RXO_MASK | QP_PARTITION)) & xqos->present;
|
||||
#if 0
|
||||
int a = (ent_qos->present & QP_TOPIC_DATA) ? (int) ent_qos->topic_data.length : 6;
|
||||
int b = (xqos->present & QP_TOPIC_DATA) ? (int) xqos->topic_data.length : 6;
|
||||
char *astr = (ent_qos->present & QP_TOPIC_DATA) ? (char *) ent_qos->topic_data.value : "(null)";
|
||||
char *bstr = (xqos->present & QP_TOPIC_DATA) ? (char *) xqos->topic_data.value : "(null)";
|
||||
printf ("%d: "PGUIDFMT" ent_qos %d \"%*.*s\" xqos %d \"%*.*s\" => mask %d\n",
|
||||
(int) getpid (), PGUID (e->guid),
|
||||
!!(ent_qos->present & QP_TOPIC_DATA), a, a, astr,
|
||||
!!(xqos->present & QP_TOPIC_DATA), b, b, bstr,
|
||||
!!(mask & QP_TOPIC_DATA));
|
||||
#endif
|
||||
DDS_LOG (DDS_LC_DISCOVERY, "update_qos_locked "PGUIDFMT" delta=%"PRIu64" QOS={", PGUID(e->guid), mask);
|
||||
nn_log_xqos(DDS_LC_DISCOVERY, xqos);
|
||||
DDS_LOG (DDS_LC_DISCOVERY, "}\n");
|
||||
|
||||
if (mask == 0)
|
||||
/* no change, or an as-yet unsupported one */
|
||||
return false;
|
||||
|
||||
nn_xqos_fini_mask (ent_qos, mask);
|
||||
nn_xqos_mergein_missing (ent_qos, xqos, mask);
|
||||
ddsi_plugin.builtintopic_write (e, timestamp, true);
|
||||
return true;
|
||||
}
|
||||
|
||||
static dds_return_t pp_allocate_entityid(nn_entityid_t *id, unsigned kind, struct participant *pp)
|
||||
{
|
||||
uint32_t id1;
|
||||
|
@ -674,6 +704,14 @@ dds_return_t new_participant (nn_guid_t *p_ppguid, unsigned flags, const nn_plis
|
|||
return new_participant_guid (p_ppguid, flags, plist);
|
||||
}
|
||||
|
||||
void update_participant_plist (struct participant *pp, const nn_plist_t *plist)
|
||||
{
|
||||
ddsrt_mutex_lock (&pp->e.lock);
|
||||
if (update_qos_locked (&pp->e, &pp->plist->qos, &plist->qos, now ()))
|
||||
spdp_write (pp);
|
||||
ddsrt_mutex_unlock (&pp->e.lock);
|
||||
}
|
||||
|
||||
static void delete_builtin_endpoint (const struct nn_guid *ppguid, unsigned entityid)
|
||||
{
|
||||
nn_guid_t guid;
|
||||
|
@ -2909,6 +2947,14 @@ struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, str
|
|||
return lowr;
|
||||
}
|
||||
|
||||
void update_writer_qos (struct writer *wr, const dds_qos_t *xqos)
|
||||
{
|
||||
ddsrt_mutex_lock (&wr->e.lock);
|
||||
if (update_qos_locked (&wr->e, wr->xqos, xqos, now ()))
|
||||
sedp_write_writer (wr);
|
||||
ddsrt_mutex_unlock (&wr->e.lock);
|
||||
}
|
||||
|
||||
static void gc_delete_writer (struct gcreq *gcreq)
|
||||
{
|
||||
struct writer *wr = gcreq->arg;
|
||||
|
@ -3432,6 +3478,13 @@ uint64_t reader_instance_id (const struct nn_guid *guid)
|
|||
return 0;
|
||||
}
|
||||
|
||||
void update_reader_qos (struct reader *rd, const dds_qos_t *xqos)
|
||||
{
|
||||
ddsrt_mutex_lock (&rd->e.lock);
|
||||
if (update_qos_locked (&rd->e, rd->xqos, xqos, now ()))
|
||||
sedp_write_reader (rd);
|
||||
ddsrt_mutex_unlock (&rd->e.lock);
|
||||
}
|
||||
|
||||
/* PROXY-PARTICIPANT ------------------------------------------------ */
|
||||
static void gc_proxy_participant_lease (struct gcreq *gcreq)
|
||||
|
@ -3481,7 +3534,8 @@ void new_proxy_participant
|
|||
dds_duration_t tlease_dur,
|
||||
nn_vendorid_t vendor,
|
||||
unsigned custom_flags,
|
||||
nn_wctime_t timestamp
|
||||
nn_wctime_t timestamp,
|
||||
seqno_t seq
|
||||
)
|
||||
{
|
||||
/* No locking => iff all participants use unique guids, and sedp
|
||||
|
@ -3503,6 +3557,7 @@ void new_proxy_participant
|
|||
proxypp->vendor = vendor;
|
||||
proxypp->bes = bes;
|
||||
proxypp->prismtech_bes = prismtech_bes;
|
||||
proxypp->seq = seq;
|
||||
if (privileged_pp_guid) {
|
||||
proxypp->privileged_pp_guid = *privileged_pp_guid;
|
||||
} else {
|
||||
|
@ -3543,6 +3598,7 @@ void new_proxy_participant
|
|||
proxypp->as_meta = as_meta;
|
||||
proxypp->endpoints = NULL;
|
||||
proxypp->plist = nn_plist_dup (plist);
|
||||
nn_xqos_mergein_missing (&proxypp->plist->qos, &gv.default_plist_pp.qos, ~(uint64_t)0);
|
||||
ddsrt_avl_init (&proxypp_groups_treedef, &proxypp->groups);
|
||||
|
||||
if (custom_flags & CF_INC_KERNEL_SEQUENCE_NUMBERS)
|
||||
|
@ -3656,51 +3712,39 @@ void new_proxy_participant
|
|||
ddsrt_mutex_unlock (&proxypp->e.lock);
|
||||
}
|
||||
|
||||
int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp)
|
||||
int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, seqno_t seq, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp)
|
||||
{
|
||||
/* Currently, built-in processing is single-threaded, and it is only through this function and the proxy participant deletion (which necessarily happens when no-one else potentially references the proxy participant anymore). So at the moment, the lock is superfluous. */
|
||||
nn_plist_t *new_plist;
|
||||
nn_plist_t *new_plist = ddsrt_malloc (sizeof (*new_plist));
|
||||
nn_plist_init_empty (new_plist);
|
||||
nn_plist_mergein_missing (new_plist, datap, PP_PRISMTECH_NODE_NAME | PP_PRISMTECH_EXEC_NAME | PP_PRISMTECH_PROCESS_ID | PP_ENTITY_NAME, QP_USER_DATA);
|
||||
nn_plist_mergein_missing (new_plist, &gv.default_plist_pp, ~(uint64_t)0, ~(uint64_t)0);
|
||||
|
||||
new_plist = nn_plist_dup (datap);
|
||||
nn_plist_mergein_missing (new_plist, proxypp->plist, ~(uint64_t)0, ~(uint64_t)0);
|
||||
nn_plist_fini (proxypp->plist);
|
||||
ddsrt_free (proxypp->plist);
|
||||
proxypp->plist = new_plist;
|
||||
if (seq && seq > proxypp->seq)
|
||||
proxypp->seq = seq;
|
||||
|
||||
switch (source)
|
||||
{
|
||||
case UPD_PROXYPP_SPDP:
|
||||
ddsi_plugin.builtintopic_write (&proxypp->e, timestamp, true);
|
||||
update_qos_locked (&proxypp->e, &proxypp->plist->qos, &new_plist->qos, timestamp);
|
||||
nn_plist_fini (new_plist);
|
||||
ddsrt_free (new_plist);
|
||||
proxypp->proxypp_have_spdp = 1;
|
||||
break;
|
||||
|
||||
case UPD_PROXYPP_CM:
|
||||
nn_plist_fini (proxypp->plist);
|
||||
ddsrt_free (proxypp->plist);
|
||||
proxypp->plist = new_plist;
|
||||
proxypp->proxypp_have_cm = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int update_proxy_participant_plist (struct proxy_participant *proxypp, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp)
|
||||
int update_proxy_participant_plist (struct proxy_participant *proxypp, seqno_t seq, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp)
|
||||
{
|
||||
nn_plist_t tmp;
|
||||
|
||||
/* FIXME: find a better way of restricting which bits can get updated */
|
||||
ddsrt_mutex_lock (&proxypp->e.lock);
|
||||
switch (source)
|
||||
{
|
||||
case UPD_PROXYPP_SPDP:
|
||||
update_proxy_participant_plist_locked (proxypp, datap, source, timestamp);
|
||||
break;
|
||||
case UPD_PROXYPP_CM:
|
||||
tmp = *datap;
|
||||
tmp.present &=
|
||||
PP_PRISMTECH_NODE_NAME | PP_PRISMTECH_EXEC_NAME | PP_PRISMTECH_PROCESS_ID |
|
||||
PP_ENTITY_NAME;
|
||||
tmp.qos.present &= QP_PRISMTECH_ENTITY_FACTORY;
|
||||
update_proxy_participant_plist_locked (proxypp, &tmp, source, timestamp);
|
||||
break;
|
||||
}
|
||||
update_proxy_participant_plist_locked (proxypp, seq, datap, source, timestamp);
|
||||
ddsrt_mutex_unlock (&proxypp->e.lock);
|
||||
return 0;
|
||||
}
|
||||
|
@ -4064,7 +4108,6 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en
|
|||
else
|
||||
memset (&c->group_guid, 0, sizeof (c->group_guid));
|
||||
|
||||
|
||||
ref_proxy_participant (proxypp, c);
|
||||
}
|
||||
|
||||
|
@ -4175,7 +4218,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid,
|
|||
return 0;
|
||||
}
|
||||
|
||||
void update_proxy_writer (struct proxy_writer * pwr, struct addrset * as)
|
||||
void update_proxy_writer (struct proxy_writer *pwr, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp)
|
||||
{
|
||||
struct reader * rd;
|
||||
struct pwr_rd_match * m;
|
||||
|
@ -4203,10 +4246,12 @@ void update_proxy_writer (struct proxy_writer * pwr, struct addrset * as)
|
|||
m = ddsrt_avl_iter_next (&iter);
|
||||
}
|
||||
}
|
||||
|
||||
update_qos_locked (&pwr->e, pwr->c.xqos, xqos, timestamp);
|
||||
ddsrt_mutex_unlock (&pwr->e.lock);
|
||||
}
|
||||
|
||||
void update_proxy_reader (struct proxy_reader * prd, struct addrset * as)
|
||||
void update_proxy_reader (struct proxy_reader *prd, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp)
|
||||
{
|
||||
struct prd_wr_match * m;
|
||||
nn_guid_t wrguid;
|
||||
|
@ -4255,6 +4300,8 @@ void update_proxy_reader (struct proxy_reader * prd, struct addrset * as)
|
|||
ddsrt_mutex_lock (&prd->e.lock);
|
||||
}
|
||||
}
|
||||
|
||||
update_qos_locked (&prd->e, prd->c.xqos, xqos, timestamp);
|
||||
ddsrt_mutex_unlock (&prd->e.lock);
|
||||
}
|
||||
|
||||
|
|
|
@ -1248,7 +1248,7 @@ void nn_plist_init_tables (void)
|
|||
ddsrt_once (&table_init_control, nn_plist_init_tables_real);
|
||||
}
|
||||
|
||||
static void plist_or_xqos_fini (void * __restrict dst, size_t shift)
|
||||
static void plist_or_xqos_fini (void * __restrict dst, size_t shift, uint64_t pmask, uint64_t qmask)
|
||||
{
|
||||
/* shift == 0: plist, shift > 0: just qos */
|
||||
struct flagset pfs, qfs;
|
||||
|
@ -1277,7 +1277,8 @@ static void plist_or_xqos_fini (void * __restrict dst, size_t shift)
|
|||
assert (shift == 0 || entry->plist_offset - shift < sizeof (dds_qos_t));
|
||||
size_t dstoff = entry->plist_offset - shift;
|
||||
struct flagset * const fs = (entry->flags & PDF_QOS) ? &qfs : &pfs;
|
||||
if ((*fs->present & entry->present_flag))
|
||||
uint64_t mask = (entry->flags & PDF_QOS) ? qmask : pmask;
|
||||
if (*fs->present & entry->present_flag & mask)
|
||||
{
|
||||
if (!(entry->flags & PDF_FUNCTION))
|
||||
fini_generic (dst, &dstoff, fs, entry->present_flag, entry->op.desc);
|
||||
|
@ -1285,8 +1286,8 @@ static void plist_or_xqos_fini (void * __restrict dst, size_t shift)
|
|||
entry->op.f.fini (dst, &dstoff, fs, entry->present_flag);
|
||||
}
|
||||
}
|
||||
if (pfs.present) { *pfs.present = *pfs.aliased = 0; }
|
||||
*qfs.present = *qfs.aliased = 0;
|
||||
if (pfs.present) { *pfs.present &= ~pmask; *pfs.aliased &= ~pmask; }
|
||||
*qfs.present &= ~qmask; *qfs.aliased &= ~qmask;
|
||||
}
|
||||
|
||||
static void plist_or_xqos_unalias (void * __restrict dst, size_t shift)
|
||||
|
@ -1441,7 +1442,7 @@ static void plist_or_xqos_addtomsg (struct nn_xmsg *xmsg, const void * __restric
|
|||
|
||||
void nn_plist_fini (nn_plist_t *plist)
|
||||
{
|
||||
plist_or_xqos_fini (plist, 0);
|
||||
plist_or_xqos_fini (plist, 0, ~(uint64_t)0, ~(uint64_t)0);
|
||||
}
|
||||
|
||||
void nn_plist_unalias (nn_plist_t *plist)
|
||||
|
@ -2322,18 +2323,19 @@ void nn_xqos_init_empty (dds_qos_t *dest)
|
|||
void nn_plist_init_default_participant (nn_plist_t *plist)
|
||||
{
|
||||
nn_plist_init_empty (plist);
|
||||
|
||||
plist->qos.present |= QP_PRISMTECH_ENTITY_FACTORY;
|
||||
plist->qos.entity_factory.autoenable_created_entities = 0;
|
||||
|
||||
plist->qos.present |= QP_USER_DATA;
|
||||
plist->qos.user_data.length = 0;
|
||||
plist->qos.user_data.value = NULL;
|
||||
}
|
||||
|
||||
static void xqos_init_default_common (dds_qos_t *xqos)
|
||||
{
|
||||
nn_xqos_init_empty (xqos);
|
||||
|
||||
xqos->present |= QP_PARTITION;
|
||||
xqos->partition.n = 0;
|
||||
xqos->partition.strs = NULL;
|
||||
|
||||
xqos->present |= QP_PRESENTATION;
|
||||
xqos->presentation.access_scope = DDS_PRESENTATION_INSTANCE;
|
||||
xqos->presentation.coherent_access = 0;
|
||||
|
@ -2374,10 +2376,31 @@ static void xqos_init_default_common (dds_qos_t *xqos)
|
|||
xqos->ignorelocal.value = DDS_IGNORELOCAL_NONE;
|
||||
}
|
||||
|
||||
void nn_xqos_init_default_reader (dds_qos_t *xqos)
|
||||
static void nn_xqos_init_default_endpoint (dds_qos_t *xqos)
|
||||
{
|
||||
xqos_init_default_common (xqos);
|
||||
|
||||
xqos->present |= QP_TOPIC_DATA;
|
||||
xqos->topic_data.length = 0;
|
||||
xqos->topic_data.value = NULL;
|
||||
|
||||
xqos->present |= QP_GROUP_DATA;
|
||||
xqos->group_data.length = 0;
|
||||
xqos->group_data.value = NULL;
|
||||
|
||||
xqos->present |= QP_USER_DATA;
|
||||
xqos->user_data.length = 0;
|
||||
xqos->user_data.value = NULL;
|
||||
|
||||
xqos->present |= QP_PARTITION;
|
||||
xqos->partition.n = 0;
|
||||
xqos->partition.strs = NULL;
|
||||
}
|
||||
|
||||
void nn_xqos_init_default_reader (dds_qos_t *xqos)
|
||||
{
|
||||
nn_xqos_init_default_endpoint (xqos);
|
||||
|
||||
xqos->present |= QP_RELIABILITY;
|
||||
xqos->reliability.kind = DDS_RELIABILITY_BEST_EFFORT;
|
||||
|
||||
|
@ -2400,7 +2423,7 @@ void nn_xqos_init_default_reader (dds_qos_t *xqos)
|
|||
|
||||
void nn_xqos_init_default_writer (dds_qos_t *xqos)
|
||||
{
|
||||
xqos_init_default_common (xqos);
|
||||
nn_xqos_init_default_endpoint (xqos);
|
||||
|
||||
xqos->present |= QP_DURABILITY_SERVICE;
|
||||
xqos->durability_service.service_cleanup_delay = 0;
|
||||
|
@ -2461,10 +2484,14 @@ void nn_xqos_init_default_topic (dds_qos_t *xqos)
|
|||
xqos->subscription_keys.key_list.strs = NULL;
|
||||
}
|
||||
|
||||
void nn_xqos_init_default_subscriber (dds_qos_t *xqos)
|
||||
static void nn_xqos_init_default_publisher_subscriber (dds_qos_t *xqos)
|
||||
{
|
||||
nn_xqos_init_empty (xqos);
|
||||
|
||||
xqos->present |= QP_GROUP_DATA;
|
||||
xqos->group_data.length = 0;
|
||||
xqos->group_data.value = NULL;
|
||||
|
||||
xqos->present |= QP_PRISMTECH_ENTITY_FACTORY;
|
||||
xqos->entity_factory.autoenable_created_entities = 1;
|
||||
|
||||
|
@ -2473,16 +2500,14 @@ void nn_xqos_init_default_subscriber (dds_qos_t *xqos)
|
|||
xqos->partition.strs = NULL;
|
||||
}
|
||||
|
||||
void nn_xqos_init_default_subscriber (dds_qos_t *xqos)
|
||||
{
|
||||
nn_xqos_init_default_publisher_subscriber (xqos);
|
||||
}
|
||||
|
||||
void nn_xqos_init_default_publisher (dds_qos_t *xqos)
|
||||
{
|
||||
nn_xqos_init_empty (xqos);
|
||||
|
||||
xqos->present |= QP_PRISMTECH_ENTITY_FACTORY;
|
||||
xqos->entity_factory.autoenable_created_entities = 1;
|
||||
|
||||
xqos->present |= QP_PARTITION;
|
||||
xqos->partition.n = 0;
|
||||
xqos->partition.strs = NULL;
|
||||
nn_xqos_init_default_publisher_subscriber (xqos);
|
||||
}
|
||||
|
||||
void nn_xqos_copy (dds_qos_t *dst, const dds_qos_t *src)
|
||||
|
@ -2493,7 +2518,12 @@ void nn_xqos_copy (dds_qos_t *dst, const dds_qos_t *src)
|
|||
|
||||
void nn_xqos_fini (dds_qos_t *xqos)
|
||||
{
|
||||
plist_or_xqos_fini (xqos, offsetof (nn_plist_t, qos));
|
||||
plist_or_xqos_fini (xqos, offsetof (nn_plist_t, qos), ~(uint64_t)0, ~(uint64_t)0);
|
||||
}
|
||||
|
||||
void nn_xqos_fini_mask (dds_qos_t *xqos, uint64_t mask)
|
||||
{
|
||||
plist_or_xqos_fini (xqos, offsetof (nn_plist_t, qos), ~(uint64_t)0, mask);
|
||||
}
|
||||
|
||||
void nn_xqos_unalias (dds_qos_t *xqos)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue