A few fixes in the liveliness qos implementation:
- moved de-registration of a lease to a separate function, so that it is called synchronously when a pwr is deleted, to prevent the lease from expiring (before this change, the lease was renewed with t_never, but that does not work because of a check with early out in lease_renew) - handle proxypp->owns_lease correctly: when an OpenSplice instance was running in the same network and participants from OpenSplice were discovered, the lease-renewal fails in case the proxy participant's lease was not registered in minl_auto, which happens when the proxypp depends on its parent (ddsi2) participant. - increased lease duration in create_delete_writer stress test to avoid failed tests due to delayed pmd messages - fixed the indenting in liveliness tests source file Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>
This commit is contained in:
parent
da17a9f5d1
commit
e97e340650
6 changed files with 675 additions and 632 deletions
File diff suppressed because it is too large
Load diff
|
@ -42,6 +42,7 @@ void lease_management_term (struct q_globals *gv);
|
|||
struct lease *lease_new (nn_etime_t texpire, int64_t tdur, struct entity_common *e);
|
||||
struct lease *lease_clone (const struct lease *l);
|
||||
void lease_register (struct lease *l);
|
||||
void lease_unregister (struct lease *l);
|
||||
void lease_free (struct lease *l);
|
||||
void lease_renew (struct lease *l, nn_etime_t tnow);
|
||||
void lease_set_expiry (struct lease *l, nn_etime_t when);
|
||||
|
|
|
@ -610,13 +610,15 @@ static int handle_SPDP_alive (const struct receiver_state *rst, seqno_t seq, nn_
|
|||
else if (existing_entity->kind == EK_PROXY_PARTICIPANT)
|
||||
{
|
||||
struct proxy_participant *proxypp = (struct proxy_participant *) existing_entity;
|
||||
struct lease *lease;
|
||||
int interesting = 0;
|
||||
RSTTRACE ("SPDP ST0 "PGUIDFMT" (known)", PGUID (datap->participant_guid));
|
||||
/* SPDP processing is so different from normal processing that we are
|
||||
even skipping the automatic lease renewal. Note that proxy writers
|
||||
that are not alive are not set alive here. This is done only when
|
||||
data is received from a particular pwr (in handle_regular) */
|
||||
lease_renew (ddsrt_atomic_ldvoidp (&proxypp->minl_auto), now_et ());
|
||||
if ((lease = ddsrt_atomic_ldvoidp (&proxypp->minl_auto)) != NULL)
|
||||
lease_renew (lease, now_et ());
|
||||
ddsrt_mutex_lock (&proxypp->e.lock);
|
||||
if (proxypp->implicitly_created || seq > proxypp->seq)
|
||||
{
|
||||
|
|
|
@ -3662,51 +3662,67 @@ static void gc_proxy_participant_lease (struct gcreq *gcreq)
|
|||
gcreq_free (gcreq);
|
||||
}
|
||||
|
||||
void proxy_participant_reassign_lease (struct proxy_participant *proxypp, struct lease *newlease)
|
||||
{
|
||||
/* Lease renewal is done by the receive thread without locking the
|
||||
proxy participant (and I'd like to keep it that way), but that
|
||||
means we must guarantee that the lease pointer remains valid once
|
||||
loaded.
|
||||
|
||||
By loading/storing the pointer atomically, we ensure we always
|
||||
read a valid (or once valid) value, by delaying the freeing
|
||||
through the garbage collector, we ensure whatever lease update
|
||||
occurs in parallel completes before the memory is released.
|
||||
|
||||
The lease_renew(never) call ensures the lease will never expire
|
||||
while we are messing with it. */
|
||||
ddsrt_mutex_lock (&proxypp->e.lock);
|
||||
if (proxypp->owns_lease)
|
||||
{
|
||||
const nn_etime_t never = { T_NEVER };
|
||||
struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease);
|
||||
struct lease *oldlease = proxypp->lease;
|
||||
lease_renew (oldlease, never);
|
||||
gcreq->arg = oldlease;
|
||||
gcreq_enqueue (gcreq);
|
||||
proxypp->owns_lease = 0;
|
||||
}
|
||||
proxypp->lease = newlease;
|
||||
/* FIXME: replace proxypp lease in leaseheap_auto? */
|
||||
ddsrt_mutex_unlock (&proxypp->e.lock);
|
||||
}
|
||||
|
||||
static void proxy_participant_replace_minl (struct proxy_participant *proxypp, bool manbypp, struct lease *lnew)
|
||||
{
|
||||
/* By loading/storing the pointer atomically, we ensure we always
|
||||
read a valid (or once valid) lease. By delaying freeing the lease
|
||||
through the garbage collector, we ensure whatever lease update
|
||||
occurs in parallel completes before the memory is released. */
|
||||
const nn_etime_t never = { T_NEVER };
|
||||
struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease);
|
||||
struct lease *lease_old = ddsrt_atomic_ldvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto);
|
||||
lease_renew (lease_old, never); /* ensures lease will not expire while it is replaced */
|
||||
lease_unregister (lease_old); /* ensures lease will not expire while it is replaced */
|
||||
gcreq->arg = lease_old;
|
||||
gcreq_enqueue (gcreq);
|
||||
ddsrt_atomic_stvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto, lnew);
|
||||
}
|
||||
|
||||
void proxy_participant_reassign_lease (struct proxy_participant *proxypp, struct lease *newlease)
|
||||
{
|
||||
ddsrt_mutex_lock (&proxypp->e.lock);
|
||||
if (proxypp->owns_lease)
|
||||
{
|
||||
struct lease *minl = ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_auto);
|
||||
ddsrt_fibheap_delete (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease);
|
||||
if (minl == proxypp->lease)
|
||||
{
|
||||
if ((minl = ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_auto)) != NULL)
|
||||
{
|
||||
dds_duration_t trem = minl->tdur - proxypp->lease->tdur;
|
||||
assert (trem >= 0);
|
||||
nn_etime_t texp = add_duration_to_etime (now_et(), trem);
|
||||
struct lease *lnew = lease_new (texp, minl->tdur, minl->entity);
|
||||
proxy_participant_replace_minl (proxypp, false, lnew);
|
||||
lease_register (lnew);
|
||||
}
|
||||
else
|
||||
{
|
||||
proxy_participant_replace_minl (proxypp, false, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
/* Lease renewal is done by the receive thread without locking the
|
||||
proxy participant (and I'd like to keep it that way), but that
|
||||
means we must guarantee that the lease pointer remains valid once
|
||||
loaded.
|
||||
|
||||
By loading/storing the pointer atomically, we ensure we always
|
||||
read a valid (or once valid) value, by delaying the freeing
|
||||
through the garbage collector, we ensure whatever lease update
|
||||
occurs in parallel completes before the memory is released.
|
||||
|
||||
The lease_unregister call ensures the lease will never expire
|
||||
while we are messing with it. */
|
||||
struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease);
|
||||
lease_unregister (proxypp->lease);
|
||||
gcreq->arg = proxypp->lease;
|
||||
gcreq_enqueue (gcreq);
|
||||
proxypp->owns_lease = 0;
|
||||
}
|
||||
proxypp->lease = newlease;
|
||||
|
||||
ddsrt_mutex_unlock (&proxypp->e.lock);
|
||||
}
|
||||
|
||||
static void proxy_participant_add_pwr_lease_locked (struct proxy_participant * proxypp, const struct proxy_writer * pwr)
|
||||
{
|
||||
struct lease *minl_prev;
|
||||
|
@ -3764,7 +3780,6 @@ static void proxy_participant_remove_pwr_lease_locked (struct proxy_participant
|
|||
}
|
||||
else
|
||||
{
|
||||
assert (manbypp);
|
||||
proxy_participant_replace_minl (proxypp, manbypp, NULL);
|
||||
}
|
||||
}
|
||||
|
@ -3829,10 +3844,16 @@ void new_proxy_participant
|
|||
{
|
||||
struct proxy_participant *privpp;
|
||||
privpp = ephash_lookup_proxy_participant_guid (gv->guid_hash, &proxypp->privileged_pp_guid);
|
||||
|
||||
ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_auto);
|
||||
ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_man);
|
||||
ddsrt_atomic_stvoidp (&proxypp->minl_man, NULL);
|
||||
|
||||
if (privpp != NULL && privpp->is_ddsi2_pp)
|
||||
{
|
||||
proxypp->lease = privpp->lease;
|
||||
proxypp->owns_lease = 0;
|
||||
ddsrt_atomic_stvoidp (&proxypp->minl_auto, NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -3849,10 +3870,6 @@ void new_proxy_participant
|
|||
proxypp->lease = lease_new (texp, dur, &proxypp->e);
|
||||
proxypp->owns_lease = 1;
|
||||
|
||||
/* Init heap for leases */
|
||||
ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_auto);
|
||||
ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_man);
|
||||
|
||||
/* Add the proxypp lease to heap so that monitoring liveliness will include this lease
|
||||
and uses the shortest duration for proxypp and all its pwr's (with automatic liveliness) */
|
||||
ddsrt_fibheap_insert (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease);
|
||||
|
@ -3863,7 +3880,6 @@ void new_proxy_participant
|
|||
by the lease from the proxy writer in proxy_participant_add_pwr_lease_locked. This old shortest
|
||||
lease is freed, so that's why we need a clone and not the proxypp's lease in the heap. */
|
||||
ddsrt_atomic_stvoidp (&proxypp->minl_auto, (void *) lease_clone (proxypp->lease));
|
||||
ddsrt_atomic_stvoidp (&proxypp->minl_man, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4071,6 +4087,7 @@ static void unref_proxy_participant (struct proxy_participant *proxypp, struct p
|
|||
assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_man) == NULL);
|
||||
assert (ddsrt_atomic_ldvoidp (&proxypp->minl_man) == NULL);
|
||||
assert (!compare_guid (&minl_auto->entity->guid, &proxypp->e.guid));
|
||||
lease_unregister (minl_auto);
|
||||
lease_free (minl_auto);
|
||||
lease_free (proxypp->lease);
|
||||
}
|
||||
|
@ -4556,6 +4573,7 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_
|
|||
GVLOGDISC ("- unknown\n");
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
}
|
||||
|
||||
/* Set "deleting" flag in particular for Lite, to signal to the receive path it can't
|
||||
trust rdary[] anymore, which is because removing the proxy writer from the hash
|
||||
table will prevent the readers from looking up the proxy writer, and consequently
|
||||
|
@ -4565,6 +4583,9 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_
|
|||
builtintopic_write (gv->builtin_topic_interface, &pwr->e, timestamp, false);
|
||||
ephash_remove_proxy_writer_guid (gv->guid_hash, pwr);
|
||||
ddsrt_mutex_unlock (&gv->lock);
|
||||
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER &&
|
||||
pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC)
|
||||
lease_unregister (pwr->lease);
|
||||
if (proxy_writer_set_notalive (pwr, false) != DDS_RETCODE_OK)
|
||||
GVLOGDISC ("proxy_writer_set_notalive failed for "PGUIDFMT"\n", PGUID(*guid));
|
||||
gcreq_proxy_writer (pwr);
|
||||
|
|
|
@ -121,10 +121,10 @@ void lease_register (struct lease *l) /* FIXME: make lease admin struct */
|
|||
force_lease_check (gv->gcreq_queue);
|
||||
}
|
||||
|
||||
void lease_free (struct lease *l)
|
||||
void lease_unregister (struct lease *l)
|
||||
{
|
||||
struct q_globals * const gv = l->entity->gv;
|
||||
GVTRACE ("lease_free(l %p guid "PGUIDFMT")\n", (void *) l, PGUID (l->entity->guid));
|
||||
GVTRACE ("lease_unregister(l %p guid "PGUIDFMT")\n", (void *) l, PGUID (l->entity->guid));
|
||||
ddsrt_mutex_lock (&gv->leaseheap_lock);
|
||||
if (l->tsched.v != TSCHED_NOT_ON_HEAP)
|
||||
{
|
||||
|
@ -132,12 +132,18 @@ void lease_free (struct lease *l)
|
|||
l->tsched.v = TSCHED_NOT_ON_HEAP;
|
||||
}
|
||||
ddsrt_mutex_unlock (&gv->leaseheap_lock);
|
||||
ddsrt_free (l);
|
||||
|
||||
/* see lease_register() */
|
||||
force_lease_check (gv->gcreq_queue);
|
||||
}
|
||||
|
||||
void lease_free (struct lease *l)
|
||||
{
|
||||
struct q_globals * const gv = l->entity->gv;
|
||||
GVTRACE ("lease_free(l %p guid "PGUIDFMT")\n", (void *) l, PGUID (l->entity->guid));
|
||||
ddsrt_free (l);
|
||||
}
|
||||
|
||||
void lease_renew (struct lease *l, nn_etime_t tnowE)
|
||||
{
|
||||
nn_etime_t tend_new = add_duration_to_etime (tnowE, l->tdur);
|
||||
|
|
|
@ -617,6 +617,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
|
|||
struct proxy_reader *prd;
|
||||
struct wr_prd_match *rn;
|
||||
struct writer *wr;
|
||||
struct lease *lease;
|
||||
ddsi_guid_t src, dst;
|
||||
seqno_t seqbase;
|
||||
seqno_t seq_xmit;
|
||||
|
@ -668,7 +669,8 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
|
|||
return 1;
|
||||
}
|
||||
|
||||
lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->minl_auto), tnow);
|
||||
if ((lease = ddsrt_atomic_ldvoidp (&prd->c.proxypp->minl_auto)) != NULL)
|
||||
lease_renew (lease, tnow);
|
||||
if (!wr->reliable) /* note: reliability can't be changed */
|
||||
{
|
||||
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" not a reliable writer!)", PGUID (src), PGUID (dst));
|
||||
|
@ -1111,6 +1113,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
|
|||
const seqno_t lastseq = fromSN (msg->lastSN);
|
||||
struct handle_Heartbeat_helper_arg arg;
|
||||
struct proxy_writer *pwr;
|
||||
struct lease *lease;
|
||||
ddsi_guid_t src, dst;
|
||||
|
||||
src.prefix = rst->src_guid_prefix;
|
||||
|
@ -1131,15 +1134,15 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
|
|||
RSTTRACE (PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst));
|
||||
return 1;
|
||||
}
|
||||
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
|
||||
if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto)) != NULL)
|
||||
lease_renew (lease, tnow);
|
||||
RSTTRACE (PGUIDFMT" -> "PGUIDFMT":", PGUID (src), PGUID (dst));
|
||||
ddsrt_mutex_lock (&pwr->e.lock);
|
||||
if (msg->smhdr.flags & HEARTBEAT_FLAG_LIVELINESS &&
|
||||
pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC &&
|
||||
pwr->c.xqos->liveliness.lease_duration != T_NEVER)
|
||||
{
|
||||
struct lease *lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man);
|
||||
if (lease != NULL)
|
||||
if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man)) != NULL)
|
||||
lease_renew (lease, tnow);
|
||||
lease_renew (pwr->lease, tnow);
|
||||
}
|
||||
|
@ -1253,6 +1256,7 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime
|
|||
const nn_fragment_number_t fragnum = msg->lastFragmentNum - 1; /* we do 0-based */
|
||||
ddsi_guid_t src, dst;
|
||||
struct proxy_writer *pwr;
|
||||
struct lease *lease;
|
||||
|
||||
src.prefix = rst->src_guid_prefix;
|
||||
src.entityid = msg->writerId;
|
||||
|
@ -1272,7 +1276,8 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime
|
|||
return 1;
|
||||
}
|
||||
|
||||
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
|
||||
if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto)) != NULL)
|
||||
lease_renew (lease, tnow);
|
||||
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT"", PGUID (src), PGUID (dst));
|
||||
ddsrt_mutex_lock (&pwr->e.lock);
|
||||
|
||||
|
@ -1361,6 +1366,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
|
|||
struct proxy_reader *prd;
|
||||
struct wr_prd_match *rn;
|
||||
struct writer *wr;
|
||||
struct lease *lease;
|
||||
struct whc_borrowed_sample sample;
|
||||
ddsi_guid_t src, dst;
|
||||
nn_count_t *countp;
|
||||
|
@ -1397,7 +1403,8 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
|
|||
return 1;
|
||||
}
|
||||
|
||||
lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->minl_auto), tnow);
|
||||
if ((lease = ddsrt_atomic_ldvoidp (&prd->c.proxypp->minl_auto)) != NULL)
|
||||
lease_renew (lease, tnow);
|
||||
if (!wr->reliable) /* note: reliability can't be changed */
|
||||
{
|
||||
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" not a reliable writer)", PGUID (src), PGUID (dst));
|
||||
|
@ -1609,6 +1616,7 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm
|
|||
|
||||
struct proxy_writer *pwr;
|
||||
struct pwr_rd_match *wn;
|
||||
struct lease *lease;
|
||||
ddsi_guid_t src, dst;
|
||||
seqno_t gapstart, listbase;
|
||||
int32_t last_included_rel;
|
||||
|
@ -1642,7 +1650,8 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm
|
|||
return 1;
|
||||
}
|
||||
|
||||
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
|
||||
if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto)) != NULL)
|
||||
lease_renew (lease, tnow);
|
||||
ddsrt_mutex_lock (&pwr->e.lock);
|
||||
if ((wn = ddsrt_avl_lookup (&pwr_readers_treedef, &pwr->readers, &dst)) == NULL)
|
||||
{
|
||||
|
@ -2119,7 +2128,8 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct
|
|||
so check whether it is actually in manual-by-topic mode before renewing it. As pwr->lease is
|
||||
set once (during entity creation) we can read it outside the lock, keeping all the lease
|
||||
renewals together. */
|
||||
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
|
||||
if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto)) != NULL)
|
||||
lease_renew (lease, tnow);
|
||||
if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man)) != NULL && renew_manbypp_lease)
|
||||
lease_renew (lease, tnow);
|
||||
if (pwr->lease && pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue