Invoke liveliness changed without holding pwr lock

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-11-21 10:57:21 +01:00
parent e781cda9e5
commit 801def8bd5
6 changed files with 238 additions and 108 deletions

View file

@ -233,16 +233,46 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
} }
case DDS_LIVELINESS_CHANGED_STATUS_ID: { case DDS_LIVELINESS_CHANGED_STATUS_ID: {
struct dds_liveliness_changed_status * const st = vst = &rd->m_liveliness_changed_status; struct dds_liveliness_changed_status * const st = vst = &rd->m_liveliness_changed_status;
if (data->add) { DDSRT_STATIC_ASSERT ((uint32_t) LIVELINESS_CHANGED_ADD_ALIVE == 0 &&
st->alive_count++; LIVELINESS_CHANGED_ADD_ALIVE < LIVELINESS_CHANGED_ADD_NOT_ALIVE &&
st->alive_count_change++; LIVELINESS_CHANGED_ADD_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_NOT_ALIVE &&
if (st->not_alive_count > 0) { LIVELINESS_CHANGED_REMOVE_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_ALIVE &&
LIVELINESS_CHANGED_REMOVE_ALIVE < LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE &&
LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE < LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE &&
LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE < LIVELINESS_CHANGED_TWITCH &&
(uint32_t) LIVELINESS_CHANGED_TWITCH < UINT32_MAX);
assert (data->extra <= (uint32_t) LIVELINESS_CHANGED_TWITCH);
switch ((enum liveliness_changed_data_extra) data->extra)
{
case LIVELINESS_CHANGED_ADD_ALIVE:
st->alive_count++;
st->alive_count_change++;
break;
case LIVELINESS_CHANGED_ADD_NOT_ALIVE:
st->not_alive_count++;
st->not_alive_count_change++;
break;
case LIVELINESS_CHANGED_REMOVE_NOT_ALIVE:
break;
case LIVELINESS_CHANGED_REMOVE_ALIVE:
st->alive_count--;
st->not_alive_count++;
st->not_alive_count_change++;
break;
case LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE:
st->alive_count--;
st->not_alive_count++;
st->not_alive_count_change++;
break;
case LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE:
st->not_alive_count--; st->not_alive_count--;
} st->alive_count++;
} else { st->alive_count_change++;
st->alive_count--; break;
st->not_alive_count++; case LIVELINESS_CHANGED_TWITCH:
st->not_alive_count_change++; st->alive_count_change++;
st->not_alive_count_change++;
break;
} }
st->last_publication_handle = data->handle; st->last_publication_handle = data->handle;
invoke = (lst->on_liveliness_changed != 0); invoke = (lst->on_liveliness_changed != 0);

View file

@ -48,14 +48,25 @@ struct proxy_group;
struct proxy_endpoint_common; struct proxy_endpoint_common;
typedef void (*ddsi2direct_directread_cb_t) (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, void *arg); typedef void (*ddsi2direct_directread_cb_t) (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, void *arg);
/* Liveliness changed is more complicated than just add/remove. Encode the event
in status_cb_data_t::extra and ignore status_cb_data_t::add */
enum liveliness_changed_data_extra {
LIVELINESS_CHANGED_ADD_ALIVE,
LIVELINESS_CHANGED_ADD_NOT_ALIVE,
LIVELINESS_CHANGED_REMOVE_NOT_ALIVE,
LIVELINESS_CHANGED_REMOVE_ALIVE,
LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE,
LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE,
LIVELINESS_CHANGED_TWITCH
};
typedef struct status_cb_data typedef struct status_cb_data
{ {
int raw_status_id; int raw_status_id;
uint32_t extra; uint32_t extra;
uint64_t handle; uint64_t handle;
bool add; bool add;
} } status_cb_data_t;
status_cb_data_t;
typedef void (*status_cb_t) (void *entity, const status_cb_data_t *data); typedef void (*status_cb_t) (void *entity, const status_cb_data_t *data);
@ -67,6 +78,8 @@ struct prd_wr_match {
struct rd_pwr_match { struct rd_pwr_match {
ddsrt_avl_node_t avlnode; ddsrt_avl_node_t avlnode;
ddsi_guid_t pwr_guid; ddsi_guid_t pwr_guid;
unsigned pwr_alive: 1; /* tracks pwr's alive state */
uint32_t pwr_alive_vclock; /* used to ensure progress */
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM
nn_locator_t ssm_mc_loc; nn_locator_t ssm_mc_loc;
nn_locator_t ssm_src_loc; nn_locator_t ssm_src_loc;
@ -374,6 +387,7 @@ struct proxy_writer {
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM
unsigned supports_ssm: 1; /* iff 1, this proxy writer supports SSM */ unsigned supports_ssm: 1; /* iff 1, this proxy writer supports SSM */
#endif #endif
uint32_t alive_vclock; /* virtual clock counting transitions between alive/not-alive */
struct nn_defrag *defrag; /* defragmenter for this proxy writer; FIXME: perhaps shouldn't be for historical data */ struct nn_defrag *defrag; /* defragmenter for this proxy writer; FIXME: perhaps shouldn't be for historical data */
struct nn_reorder *reorder; /* message reordering for this proxy writer, out-of-sync readers can have their own, see pwr_rd_match */ struct nn_reorder *reorder; /* message reordering for this proxy writer, out-of-sync readers can have their own, see pwr_rd_match */
struct nn_dqueue *dqueue; /* delivery queue for asynchronous delivery (historical data is always delivered asynchronously) */ struct nn_dqueue *dqueue; /* delivery queue for asynchronous delivery (historical data is always delivered asynchronously) */
@ -652,8 +666,9 @@ int delete_proxy_reader (struct q_globals *gv, const struct ddsi_guid *guid, nn_
void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
int proxy_writer_set_alive (struct proxy_writer *pwr); void proxy_writer_set_alive_may_unlock (struct proxy_writer *pwr, bool notify);
int proxy_writer_set_notalive (struct proxy_writer *pwr); int proxy_writer_set_notalive (struct proxy_writer *pwr, bool notify);
void proxy_writer_set_notalive_guid (struct q_globals *gv, const struct ddsi_guid *pwrguid, bool notify);
int new_proxy_group (const struct ddsi_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp); int new_proxy_group (const struct ddsi_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp);
void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit); void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);
@ -662,8 +677,6 @@ void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid,
rebuild them all (which only makes sense after previously having emptied them all). */ rebuild them all (which only makes sense after previously having emptied them all). */
void rebuild_or_clear_writer_addrsets(struct q_globals *gv, int rebuild); void rebuild_or_clear_writer_addrsets(struct q_globals *gv, int rebuild);
void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, bool unmatch);
void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_ok); void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_ok);
struct ddsi_writer_info; struct ddsi_writer_info;

View file

@ -129,9 +129,8 @@ void handle_pmd_message (const struct receiver_state *rst, nn_wctime_t timestamp
ppguid.entityid.u = NN_ENTITYID_PARTICIPANT; ppguid.entityid.u = NN_ENTITYID_PARTICIPANT;
if ((proxypp = ephash_lookup_proxy_participant_guid (rst->gv->guid_hash, &ppguid)) == NULL) if ((proxypp = ephash_lookup_proxy_participant_guid (rst->gv->guid_hash, &ppguid)) == NULL)
RSTTRACE (" PPunknown"); RSTTRACE (" PPunknown");
else if (kind == PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE &&
if (kind == PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE && (l = ddsrt_atomic_ldvoidp (&proxypp->minl_man)) != NULL)
(l = ddsrt_atomic_ldvoidp (&proxypp->minl_man)) != NULL)
{ {
/* Renew lease for entity with shortest manual-by-participant lease */ /* Renew lease for entity with shortest manual-by-participant lease */
lease_renew (l, now_et ()); lease_renew (l, now_et ());

View file

@ -64,6 +64,11 @@ struct deleted_participants_admin {
int64_t delay; int64_t delay;
}; };
struct proxy_writer_alive_state {
bool alive;
uint32_t vclock;
};
static int compare_guid (const void *va, const void *vb); static int compare_guid (const void *va, const void *vb);
static void augment_wr_prd_match (void *vnode, const void *vleft, const void *vright); static void augment_wr_prd_match (void *vnode, const void *vleft, const void *vright);
@ -1431,6 +1436,19 @@ static void free_wr_rd_match (struct wr_rd_match *m)
if (m) ddsrt_free (m); if (m) ddsrt_free (m);
} }
static void proxy_writer_get_alive_state_locked (struct proxy_writer *pwr, struct proxy_writer_alive_state *st)
{
st->alive = pwr->alive;
st->vclock = pwr->alive_vclock;
}
static void proxy_writer_get_alive_state (struct proxy_writer *pwr, struct proxy_writer_alive_state *st)
{
ddsrt_mutex_lock (&pwr->e.lock);
proxy_writer_get_alive_state_locked (pwr, st);
ddsrt_mutex_unlock (&pwr->e.lock);
}
static void writer_drop_connection (const struct ddsi_guid *wr_guid, const struct proxy_reader *prd) static void writer_drop_connection (const struct ddsi_guid *wr_guid, const struct proxy_reader *prd)
{ {
struct writer *wr; struct writer *wr;
@ -1488,14 +1506,63 @@ static void writer_drop_local_connection (const struct ddsi_guid *wr_guid, struc
} }
} }
void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, bool unmatch) static void reader_update_notify_pwr_alive_state (struct reader *rd, const struct proxy_writer *pwr, const struct proxy_writer_alive_state *alive_state)
{
struct rd_pwr_match *m;
bool notify = false;
int delta = 0; /* -1: alive -> not_alive; 0: unchanged; 1: not_alive -> alive */
ddsrt_mutex_lock (&rd->e.lock);
if ((m = ddsrt_avl_lookup (&rd_writers_treedef, &rd->writers, &pwr->e.guid)) != NULL)
{
if ((int32_t) (alive_state->vclock - m->pwr_alive_vclock) > 0)
{
delta = (int) alive_state->alive - (int) m->pwr_alive;
notify = true;
m->pwr_alive = alive_state->alive;
m->pwr_alive_vclock = alive_state->vclock;
}
}
ddsrt_mutex_unlock (&rd->e.lock);
if (delta < 0 && rd->rhc)
{
struct ddsi_writer_info wrinfo;
ddsi_make_writer_info (&wrinfo, &pwr->e, pwr->c.xqos);
ddsi_rhc_unregister_wr (rd->rhc, &wrinfo);
}
/* Liveliness changed events can race each other and can, potentially, be delivered
in a different order. */
if (notify && rd->status_cb)
{
status_cb_data_t data;
data.handle = pwr->e.iid;
if (delta == 0)
data.extra = (uint32_t) LIVELINESS_CHANGED_TWITCH;
else if (delta < 0)
data.extra = (uint32_t) LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE;
else
data.extra = (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE;
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
}
}
static void reader_update_notify_pwr_alive_state_guid (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, const struct proxy_writer_alive_state *alive_state)
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rd_guid)) != NULL)
reader_update_notify_pwr_alive_state (rd, pwr, alive_state);
}
static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr)
{ {
struct reader *rd; struct reader *rd;
if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rd_guid)) != NULL) if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rd_guid)) != NULL)
{ {
struct rd_pwr_match *m; struct rd_pwr_match *m;
ddsrt_mutex_lock (&rd->e.lock); ddsrt_mutex_lock (&rd->e.lock);
if ((m = ddsrt_avl_lookup (&rd_writers_treedef, &rd->writers, &pwr->e.guid)) != NULL && unmatch) if ((m = ddsrt_avl_lookup (&rd_writers_treedef, &rd->writers, &pwr->e.guid)) != NULL)
ddsrt_avl_delete (&rd_writers_treedef, &rd->writers, m); ddsrt_avl_delete (&rd_writers_treedef, &rd->writers, m);
ddsrt_mutex_unlock (&rd->e.lock); ddsrt_mutex_unlock (&rd->e.lock);
if (m != NULL) if (m != NULL)
@ -1509,20 +1576,18 @@ void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy
if (rd->status_cb) if (rd->status_cb)
{ {
status_cb_data_t data; status_cb_data_t data;
data.add = false;
data.handle = pwr->e.iid; data.handle = pwr->e.iid;
data.add = false;
data.extra = (uint32_t) (m->pwr_alive ? LIVELINESS_CHANGED_REMOVE_ALIVE : LIVELINESS_CHANGED_REMOVE_NOT_ALIVE);
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data); (rd->status_cb) (rd->status_cb_entity, &data);
if (unmatch) data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
{ (rd->status_cb) (rd->status_cb_entity, &data);
data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
}
} }
} }
if (unmatch) free_rd_pwr_match (pwr->e.gv, m);
free_rd_pwr_match (pwr->e.gv, m);
} }
} }
@ -1548,9 +1613,9 @@ static void reader_drop_local_connection (const struct ddsi_guid *rd_guid, const
if (rd->status_cb) if (rd->status_cb)
{ {
status_cb_data_t data; status_cb_data_t data;
data.add = false;
data.handle = wr->e.iid; data.handle = wr->e.iid;
data.add = false;
data.extra = (uint32_t) LIVELINESS_CHANGED_REMOVE_ALIVE;
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data); (rd->status_cb) (rd->status_cb_entity, &data);
@ -1789,12 +1854,14 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd)
} }
} }
static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr, nn_count_t *init_count) static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr, nn_count_t *init_count, const struct proxy_writer_alive_state *alive_state)
{ {
struct rd_pwr_match *m = ddsrt_malloc (sizeof (*m)); struct rd_pwr_match *m = ddsrt_malloc (sizeof (*m));
ddsrt_avl_ipath_t path; ddsrt_avl_ipath_t path;
m->pwr_guid = pwr->e.guid; m->pwr_guid = pwr->e.guid;
m->pwr_alive = alive_state->alive;
m->pwr_alive_vclock = alive_state->vclock;
ddsrt_mutex_lock (&rd->e.lock); ddsrt_mutex_lock (&rd->e.lock);
@ -1848,9 +1915,14 @@ static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr,
if (rd->status_cb) if (rd->status_cb)
{ {
status_cb_data_t data; status_cb_data_t data;
data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
data.add = true;
data.handle = pwr->e.iid; data.handle = pwr->e.iid;
data.add = true;
data.extra = (uint32_t) (alive_state->alive ? LIVELINESS_CHANGED_ADD_ALIVE : LIVELINESS_CHANGED_ADD_NOT_ALIVE);
data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data); (rd->status_cb) (rd->status_cb_entity, &data);
} }
} }
@ -1882,14 +1954,15 @@ static void reader_add_local_connection (struct reader *rd, struct writer *wr)
if (rd->status_cb) if (rd->status_cb)
{ {
status_cb_data_t data; status_cb_data_t data;
data.add = true;
data.handle = wr->e.iid; data.handle = wr->e.iid;
data.add = true;
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; data.extra = (uint32_t) LIVELINESS_CHANGED_ADD_ALIVE;
(rd->status_cb) (rd->status_cb_entity, &data);
data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID; data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data); (rd->status_cb) (rd->status_cb_entity, &data);
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
} }
} }
} }
@ -2007,16 +2080,6 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
qxev_pwr_entityid (pwr, &rd->e.guid.prefix); qxev_pwr_entityid (pwr, &rd->e.guid.prefix);
ELOGDISC (pwr, "\n"); ELOGDISC (pwr, "\n");
if (rd->status_cb)
{
status_cb_data_t data;
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
data.add = true;
data.handle = pwr->e.iid;
(rd->status_cb) (rd->status_cb_entity, &data);
}
return; return;
already_matched: already_matched:
@ -2188,6 +2251,7 @@ static void connect_proxy_writer_with_reader (struct proxy_writer *pwr, struct r
const int isb1 = (is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE) != 0); const int isb1 = (is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE) != 0);
dds_qos_policy_id_t reason; dds_qos_policy_id_t reason;
nn_count_t init_count; nn_count_t init_count;
struct proxy_writer_alive_state alive_state;
if (isb0 != isb1) if (isb0 != isb1)
return; return;
if (rd->e.onlylocal) if (rd->e.onlylocal)
@ -2197,8 +2261,18 @@ static void connect_proxy_writer_with_reader (struct proxy_writer *pwr, struct r
reader_qos_mismatch (rd, reason); reader_qos_mismatch (rd, reason);
return; return;
} }
reader_add_connection (rd, pwr, &init_count);
/* Initialze the reader's tracking information for the writer liveliness state to something
sensible, but that may be outdated by the time the reader gets added to the writer's list
of matching readers. */
proxy_writer_get_alive_state (pwr, &alive_state);
reader_add_connection (rd, pwr, &init_count, &alive_state);
proxy_writer_add_connection (pwr, rd, tnow, init_count); proxy_writer_add_connection (pwr, rd, tnow, init_count);
/* Once everything is set up: update with the latest state, any updates to the alive state
happening in parallel will cause this to become a no-op. */
proxy_writer_get_alive_state (pwr, &alive_state);
reader_update_notify_pwr_alive_state (rd, pwr, &alive_state);
} }
static bool ignore_local_p (const ddsi_guid_t *guid1, const ddsi_guid_t *guid2, const struct dds_qos *xqos1, const struct dds_qos *xqos2) static bool ignore_local_p (const ddsi_guid_t *guid1, const ddsi_guid_t *guid2, const struct dds_qos *xqos1, const struct dds_qos *xqos2)
@ -4274,6 +4348,7 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
pwr->nackfragcount = 0; pwr->nackfragcount = 0;
pwr->last_fragnum_reset = 0; pwr->last_fragnum_reset = 0;
pwr->alive = 1; pwr->alive = 1;
pwr->alive_vclock = 0;
ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, 1); ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, 1);
if (is_builtin_entityid (pwr->e.guid.entityid, pwr->c.vendor)) { if (is_builtin_entityid (pwr->e.guid.entityid, pwr->c.vendor)) {
/* The DDSI built-in proxy writers always deliver /* The DDSI built-in proxy writers always deliver
@ -4453,7 +4528,7 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq)
{ {
struct pwr_rd_match *m = ddsrt_avl_root_non_empty (&pwr_readers_treedef, &pwr->readers); struct pwr_rd_match *m = ddsrt_avl_root_non_empty (&pwr_readers_treedef, &pwr->readers);
ddsrt_avl_delete (&pwr_readers_treedef, &pwr->readers, m); ddsrt_avl_delete (&pwr_readers_treedef, &pwr->readers, m);
reader_drop_connection (&m->rd_guid, pwr, true); reader_drop_connection (&m->rd_guid, pwr);
update_reader_init_acknack_count (&pwr->e.gv->logconfig, pwr->e.gv->guid_hash, &m->rd_guid, m->count); update_reader_init_acknack_count (&pwr->e.gv->logconfig, pwr->e.gv->guid_hash, &m->rd_guid, m->count);
free_pwr_rd_match (m); free_pwr_rd_match (m);
} }
@ -4490,62 +4565,86 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_
builtintopic_write (gv->builtin_topic_interface, &pwr->e, timestamp, false); builtintopic_write (gv->builtin_topic_interface, &pwr->e, timestamp, false);
ephash_remove_proxy_writer_guid (gv->guid_hash, pwr); ephash_remove_proxy_writer_guid (gv->guid_hash, pwr);
ddsrt_mutex_unlock (&gv->lock); ddsrt_mutex_unlock (&gv->lock);
if (proxy_writer_set_notalive (pwr) != DDS_RETCODE_OK) if (proxy_writer_set_notalive (pwr, false) != DDS_RETCODE_OK)
GVLOGDISC ("proxy_writer_set_notalive failed for "PGUIDFMT"\n", PGUID(*guid)); GVLOGDISC ("proxy_writer_set_notalive failed for "PGUIDFMT"\n", PGUID(*guid));
/* Set lease expiry for this pwr to never so that the pwr will not be set
to alive again while it is scheduled for being deleted. */
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER)
lease_renew (pwr->lease, (nn_etime_t){ T_NEVER });
gcreq_proxy_writer (pwr); gcreq_proxy_writer (pwr);
return DDS_RETCODE_OK; return DDS_RETCODE_OK;
} }
int proxy_writer_set_alive (struct proxy_writer *pwr) static void proxy_writer_notify_liveliness_change_may_unlock (struct proxy_writer *pwr)
{ {
/* Caller has pwr->e.lock, so we can safely read pwr->alive. For updating struct proxy_writer_alive_state alive_state;
* this field this function is also taking pwr->c.proxypp->e.lock */ proxy_writer_get_alive_state_locked (pwr, &alive_state);
ddsrt_avl_iter_t it;
if (pwr->alive) struct ddsi_guid rdguid;
return DDS_RETCODE_PRECONDITION_NOT_MET; struct pwr_rd_match *m;
memset (&rdguid, 0, sizeof (rdguid));
while (pwr->alive_vclock == alive_state.vclock &&
(m = ddsrt_avl_lookup_succ (&pwr_readers_treedef, &pwr->readers, &rdguid)) != NULL)
{
rdguid = m->rd_guid;
ddsrt_mutex_unlock (&pwr->e.lock);
/* unlocking pwr means alive state may have changed already; we break out of the loop once we
detect this but there for the reader in the current iteration, anything is possible */
reader_update_notify_pwr_alive_state_guid (&rdguid, pwr, &alive_state);
ddsrt_mutex_lock (&pwr->e.lock);
}
}
void proxy_writer_set_alive_may_unlock (struct proxy_writer *pwr, bool notify)
{
/* Caller has pwr->e.lock, so we can safely read pwr->alive. Updating pwr->alive requires
also taking pwr->c.proxypp->e.lock because pwr->alive <=> (pwr->lease in proxypp's lease
heap). */
assert (!pwr->alive);
ddsrt_mutex_lock (&pwr->c.proxypp->e.lock); ddsrt_mutex_lock (&pwr->c.proxypp->e.lock);
pwr->alive = true; pwr->alive = true;
pwr->alive_vclock++;
for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL)
{
status_cb_data_t data;
data.add = true;
data.handle = pwr->e.iid;
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
}
}
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC) if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
proxy_participant_add_pwr_lease_locked (pwr->c.proxypp, pwr); proxy_participant_add_pwr_lease_locked (pwr->c.proxypp, pwr);
ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock); ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock);
return DDS_RETCODE_OK;
if (notify)
proxy_writer_notify_liveliness_change_may_unlock (pwr);
} }
int proxy_writer_set_notalive (struct proxy_writer *pwr) int proxy_writer_set_notalive (struct proxy_writer *pwr, bool notify)
{ {
/* Caller should not have taken pwr->e.lock and pwr->c.proxypp->e.lock; /* Caller should not have taken pwr->e.lock and pwr->c.proxypp->e.lock;
* this function takes both locks to update pwr->alive value */ * this function takes both locks to update pwr->alive value */
int ret = DDS_RETCODE_OK;
ddsrt_mutex_lock (&pwr->e.lock); ddsrt_mutex_lock (&pwr->e.lock);
if (!pwr->alive) if (!pwr->alive)
ret = DDS_RETCODE_PRECONDITION_NOT_MET; {
ddsrt_mutex_unlock (&pwr->e.lock);
return DDS_RETCODE_PRECONDITION_NOT_MET;
}
ddsrt_mutex_lock (&pwr->c.proxypp->e.lock);
pwr->alive = false;
pwr->alive_vclock++;
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
proxy_participant_remove_pwr_lease_locked (pwr->c.proxypp, pwr);
ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock);
if (notify)
proxy_writer_notify_liveliness_change_may_unlock (pwr);
ddsrt_mutex_unlock (&pwr->e.lock);
return DDS_RETCODE_OK;
}
void proxy_writer_set_notalive_guid (struct q_globals *gv, const struct ddsi_guid *pwrguid, bool notify)
{
struct proxy_writer *pwr;
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, pwrguid)) == NULL)
GVLOGDISC (" "PGUIDFMT"?\n", PGUID (*pwrguid));
else else
{ {
ddsrt_mutex_lock (&pwr->c.proxypp->e.lock); GVLOGDISC ("proxy_writer_set_notalive_guid ("PGUIDFMT")", PGUID (*pwrguid));
pwr->alive = false; if (proxy_writer_set_notalive (pwr, notify) == DDS_RETCODE_PRECONDITION_NOT_MET)
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC) GVLOGDISC (" pwr was not alive");
proxy_participant_remove_pwr_lease_locked (pwr->c.proxypp, pwr); GVLOGDISC ("\n");
ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock);
} }
ddsrt_mutex_unlock (&pwr->e.lock);
return ret;
} }
/* PROXY-READER ----------------------------------------------------- */ /* PROXY-READER ----------------------------------------------------- */

View file

@ -275,27 +275,8 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow
delete_proxy_participant_by_guid (gv, &g, now(), 1); delete_proxy_participant_by_guid (gv, &g, now(), 1);
break; break;
case EK_PROXY_WRITER: case EK_PROXY_WRITER:
{ proxy_writer_set_notalive_guid (gv, &g, true);
struct proxy_writer *pwr;
ddsrt_avl_iter_t it;
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, &g)) == NULL)
{
GVLOGDISC (" "PGUIDFMT"?\n", PGUID (g));
ddsrt_mutex_lock (&gv->leaseheap_lock);
continue;
}
GVLOGDISC ("proxy_writer_set_notalive ("PGUIDFMT")", PGUID (g));
if (proxy_writer_set_notalive (pwr) == DDS_RETCODE_PRECONDITION_NOT_MET)
{
GVLOGDISC (" pwr was not alive");
ddsrt_mutex_lock (&gv->leaseheap_lock);
continue;
}
GVLOGDISC ("\n");
for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
reader_drop_connection (&m->rd_guid, pwr, false);
break; break;
}
case EK_PARTICIPANT: case EK_PARTICIPANT:
case EK_READER: case EK_READER:
case EK_WRITER: case EK_WRITER:

View file

@ -2114,18 +2114,26 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct
return; return;
} }
/* Proxy participant's "automatic" lease has to be renewed always, manual-by-participant one only
for data published by the application. If pwr->lease exists, it is in some manual lease mode,
so check whether it is actually in manual-by-topic mode before renewing it. As pwr->lease is
set once (during entity creation) we can read it outside the lock, keeping all the lease
renewals together. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow); lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man)) != NULL && renew_manbypp_lease) if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man)) != NULL && renew_manbypp_lease)
lease_renew (lease, tnow); lease_renew (lease, tnow);
if (pwr->lease && pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC)
lease_renew (pwr->lease, tnow);
/* Shouldn't lock the full writer, but will do so for now */ /* Shouldn't lock the full writer, but will do so for now */
ddsrt_mutex_lock (&pwr->e.lock); ddsrt_mutex_lock (&pwr->e.lock);
if (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC && pwr->c.xqos->liveliness.lease_duration != T_NEVER) /* A change in transition from not-alive to alive is relatively complicated
lease_renew (pwr->lease, tnow); and may involve temporarily unlocking the proxy writer during the process
(to avoid unnecessarily holding pwr->e.lock while invoking listeners on
the reader) */
if (!pwr->alive) if (!pwr->alive)
proxy_writer_set_alive (pwr); proxy_writer_set_alive_may_unlock (pwr, true);
/* Don't accept data when reliable readers exist and we haven't yet seen /* Don't accept data when reliable readers exist and we haven't yet seen
a heartbeat telling us what the "current" sequence number of the writer a heartbeat telling us what the "current" sequence number of the writer