diff --git a/src/core/ddsc/src/dds_whc.c b/src/core/ddsc/src/dds_whc.c index 19f9680..8e06b22 100644 --- a/src/core/ddsc/src/dds_whc.c +++ b/src/core/ddsc/src/dds_whc.c @@ -96,7 +96,7 @@ struct whc_impl { }; struct whc_sample_iter_impl { - struct whc_impl *whc; + struct whc_sample_iter_base c; bool first; }; @@ -135,39 +135,39 @@ static struct whc_node *whc_findseq (const struct whc_impl *whc, seqno_t seq); static void insert_whcn_in_hash (struct whc_impl *whc, struct whc_node *whcn); static void whc_delete_one (struct whc_impl *whc, struct whc_node *whcn); static int compare_seq (const void *va, const void *vb); -static unsigned whc_remove_acked_messages_full (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list); static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *deferred_free_list); static void get_state_locked(const struct whc_impl *whc, struct whc_state *st); -static unsigned whc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list); -static void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list); -static void whc_get_state(const struct whc *whc, struct whc_state *st); -static int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct tkmap_instance *tk); -static seqno_t whc_next_seq (const struct whc *whc, seqno_t seq); -static bool whc_borrow_sample (const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample); -static bool whc_borrow_sample_key (const struct whc *whc, const struct ddsi_serdata *serdata_key, struct whc_borrowed_sample *sample); -static void whc_return_sample (struct whc *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info); -static unsigned whc_downgrade_to_volatile (struct whc *whc, struct whc_state *st); -static void whc_sample_iter_init (const struct whc *whc, struct whc_sample_iter *opaque_it); -static bool whc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample); -static void whc_free (struct whc *whc); +static unsigned whc_default_remove_acked_messages_full (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list); +static unsigned whc_default_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list); +static void whc_default_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list); +static void whc_default_get_state(const struct whc *whc, struct whc_state *st); +static int whc_default_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct tkmap_instance *tk); +static seqno_t whc_default_next_seq (const struct whc *whc, seqno_t seq); +static bool whc_default_borrow_sample (const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample); +static bool whc_default_borrow_sample_key (const struct whc *whc, const struct ddsi_serdata *serdata_key, struct whc_borrowed_sample *sample); +static void whc_default_return_sample (struct whc *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info); +static unsigned whc_default_downgrade_to_volatile (struct whc *whc, struct whc_state *st); +static void whc_default_sample_iter_init (const struct whc *whc, struct whc_sample_iter *opaque_it); +static bool whc_default_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample); +static void whc_default_free (struct whc *whc); static const ut_avlTreedef_t whc_seq_treedef = UT_AVL_TREEDEF_INITIALIZER (offsetof (struct whc_intvnode, avlnode), offsetof (struct whc_intvnode, min), compare_seq, 0); static const struct whc_ops whc_ops = { - .insert = whc_insert, - .remove_acked_messages = whc_remove_acked_messages, - .free_deferred_free_list = whc_free_deferred_free_list, - .get_state = whc_get_state, - .next_seq = whc_next_seq, - .borrow_sample = whc_borrow_sample, - .borrow_sample_key = whc_borrow_sample_key, - .return_sample = whc_return_sample, - .sample_iter_init = whc_sample_iter_init, - .sample_iter_borrow_next = whc_sample_iter_borrow_next, - .downgrade_to_volatile = whc_downgrade_to_volatile, - .free = whc_free + .insert = whc_default_insert, + .remove_acked_messages = whc_default_remove_acked_messages, + .free_deferred_free_list = whc_default_free_deferred_free_list, + .get_state = whc_default_get_state, + .next_seq = whc_default_next_seq, + .borrow_sample = whc_default_borrow_sample, + .borrow_sample_key = whc_default_borrow_sample_key, + .return_sample = whc_default_return_sample, + .sample_iter_init = whc_default_sample_iter_init, + .sample_iter_borrow_next = whc_default_sample_iter_borrow_next, + .downgrade_to_volatile = whc_default_downgrade_to_volatile, + .free = whc_default_free }; #if USE_EHH @@ -405,7 +405,7 @@ static void free_whc_node_contents (struct whc_node *whcn) } } -void whc_free (struct whc *whc_generic) +void whc_default_free (struct whc *whc_generic) { /* Freeing stuff without regards for maintaining data structures */ struct whc_impl * const whc = (struct whc_impl *)whc_generic; @@ -467,7 +467,7 @@ static void get_state_locked(const struct whc_impl *whc, struct whc_state *st) } } -static void whc_get_state(const struct whc *whc_generic, struct whc_state *st) +static void whc_default_get_state(const struct whc *whc_generic, struct whc_state *st) { const struct whc_impl * const whc = (const struct whc_impl *)whc_generic; os_mutexLock ((struct os_mutex *)&whc->lock); @@ -518,7 +518,7 @@ static struct whc_node *find_nextseq_intv (struct whc_intvnode **p_intv, const s } } -static seqno_t whc_next_seq (const struct whc *whc_generic, seqno_t seq) +static seqno_t whc_default_next_seq (const struct whc *whc_generic, seqno_t seq) { const struct whc_impl * const whc = (const struct whc_impl *)whc_generic; struct whc_node *n; @@ -596,7 +596,7 @@ static int whcn_in_tlidx (const struct whc_impl *whc, const struct whc_idxnode * } } -static unsigned whc_downgrade_to_volatile (struct whc *whc_generic, struct whc_state *st) +static unsigned whc_default_downgrade_to_volatile (struct whc *whc_generic, struct whc_state *st) { struct whc_impl * const whc = (struct whc_impl *)whc_generic; seqno_t old_max_drop_seq; @@ -638,8 +638,8 @@ static unsigned whc_downgrade_to_volatile (struct whc *whc_generic, struct whc_s them all. */ old_max_drop_seq = whc->max_drop_seq; whc->max_drop_seq = 0; - cnt = whc_remove_acked_messages_full (whc, old_max_drop_seq, &deferred_free_list); - whc_free_deferred_free_list (whc_generic, deferred_free_list); + cnt = whc_default_remove_acked_messages_full (whc, old_max_drop_seq, &deferred_free_list); + whc_default_free_deferred_free_list (whc_generic, deferred_free_list); assert (whc->max_drop_seq == old_max_drop_seq); get_state_locked(whc, st); os_mutexUnlock (&whc->lock); @@ -789,13 +789,13 @@ static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *defe } } -static void whc_free_deferred_free_list (struct whc *whc_generic, struct whc_node *deferred_free_list) +static void whc_default_free_deferred_free_list (struct whc *whc_generic, struct whc_node *deferred_free_list) { struct whc_impl * const whc = (struct whc_impl *)whc_generic; free_deferred_free_list(whc, deferred_free_list); } -static unsigned whc_remove_acked_messages_noidx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list) +static unsigned whc_default_remove_acked_messages_noidx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list) { struct whc_intvnode *intv; struct whc_node *whcn; @@ -871,7 +871,7 @@ static unsigned whc_remove_acked_messages_noidx (struct whc_impl *whc, seqno_t m return ndropped; } -static unsigned whc_remove_acked_messages_full (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list) +static unsigned whc_default_remove_acked_messages_full (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list) { struct whc_intvnode *intv; struct whc_node *whcn; @@ -1005,7 +1005,7 @@ static unsigned whc_remove_acked_messages_full (struct whc_impl *whc, seqno_t ma return ndropped; } -static unsigned whc_remove_acked_messages (struct whc *whc_generic, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list) +static unsigned whc_default_remove_acked_messages (struct whc *whc_generic, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list) { struct whc_impl * const whc = (struct whc_impl *)whc_generic; unsigned cnt; @@ -1018,7 +1018,7 @@ static unsigned whc_remove_acked_messages (struct whc *whc_generic, seqno_t max_ { struct whc_state tmp; get_state_locked(whc, &tmp); - TRACE_WHC(("whc_remove_acked_messages(%p max_drop_seq %"PRId64")\n", (void *)whc, max_drop_seq)); + TRACE_WHC(("whc_default_remove_acked_messages(%p max_drop_seq %"PRId64")\n", (void *)whc, max_drop_seq)); TRACE_WHC((" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %u tl %u\n", tmp.min_seq, tmp.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth)); } @@ -1026,15 +1026,15 @@ static unsigned whc_remove_acked_messages (struct whc *whc_generic, seqno_t max_ check_whc (whc); if (whc->idxdepth == 0) - cnt = whc_remove_acked_messages_noidx (whc, max_drop_seq, deferred_free_list); + cnt = whc_default_remove_acked_messages_noidx (whc, max_drop_seq, deferred_free_list); else - cnt = whc_remove_acked_messages_full (whc, max_drop_seq, deferred_free_list); + cnt = whc_default_remove_acked_messages_full (whc, max_drop_seq, deferred_free_list); get_state_locked(whc, whcst); os_mutexUnlock (&whc->lock); return cnt; } -static struct whc_node *whc_insert_seq (struct whc_impl *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata) +static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata) { struct whc_node *newn = NULL; @@ -1095,7 +1095,7 @@ static struct whc_node *whc_insert_seq (struct whc_impl *whc, seqno_t max_drop_s return newn; } -static int whc_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct tkmap_instance *tk) +static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct tkmap_instance *tk) { struct whc_impl * const whc = (struct whc_impl *)whc_generic; struct whc_node *newn = NULL; @@ -1112,7 +1112,7 @@ static int whc_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t se { struct whc_state whcst; get_state_locked(whc, &whcst); - TRACE_WHC(("whc_insert(%p max_drop_seq %"PRId64" seq %"PRId64" plist %p serdata %p:%"PRIx32")\n", (void *)whc, max_drop_seq, seq, (void*)plist, (void*)serdata, serdata->hash)); + TRACE_WHC(("whc_default_insert(%p max_drop_seq %"PRId64" seq %"PRId64" plist %p serdata %p:%"PRIx32")\n", (void *)whc, max_drop_seq, seq, (void*)plist, (void*)serdata, serdata->hash)); TRACE_WHC((" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %u tl %u\n", whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth)); } @@ -1126,7 +1126,7 @@ static int whc_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t se assert (whc->seq_size == 0 || seq > whc->maxseq_node->seq); /* Always insert in seq admin */ - newn = whc_insert_seq (whc, max_drop_seq, seq, plist, serdata); + newn = whc_default_insert_seq (whc, max_drop_seq, seq, plist, serdata); TRACE_WHC((" whcn %p:", (void*)newn)); @@ -1179,7 +1179,7 @@ static int whc_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t se /* Special case for dropping everything beyond T-L history when the new sample is being auto-acknowledged (for lack of reliable readers), and the keep-last T-L history is shallower than the keep-last regular history (normal path handles this via pruning in - whc_remove_acked_messages, but that never happens when there are no readers). */ + whc_default_remove_acked_messages, but that never happens when there are no readers). */ if (seq <= max_drop_seq && whc->tldepth > 0 && whc->idxdepth > whc->tldepth) { unsigned pos = idxn->headidx + whc->idxdepth - whc->tldepth; @@ -1246,7 +1246,7 @@ static void make_borrowed_sample(struct whc_borrowed_sample *sample, struct whc_ sample->last_rexmit_ts = whcn->last_rexmit_ts; } -static bool whc_borrow_sample (const struct whc *whc_generic, seqno_t seq, struct whc_borrowed_sample *sample) +static bool whc_default_borrow_sample (const struct whc *whc_generic, seqno_t seq, struct whc_borrowed_sample *sample) { const struct whc_impl * const whc = (const struct whc_impl *)whc_generic; struct whc_node *whcn; @@ -1263,7 +1263,7 @@ static bool whc_borrow_sample (const struct whc *whc_generic, seqno_t seq, struc return found; } -static bool whc_borrow_sample_key (const struct whc *whc_generic, const struct ddsi_serdata *serdata_key, struct whc_borrowed_sample *sample) +static bool whc_default_borrow_sample_key (const struct whc *whc_generic, const struct ddsi_serdata *serdata_key, struct whc_borrowed_sample *sample) { const struct whc_impl * const whc = (const struct whc_impl *)whc_generic; struct whc_node *whcn; @@ -1304,7 +1304,7 @@ static void return_sample_locked (struct whc_impl *whc, struct whc_borrowed_samp } } -static void whc_return_sample (struct whc *whc_generic, struct whc_borrowed_sample *sample, bool update_retransmit_info) +static void whc_default_return_sample (struct whc *whc_generic, struct whc_borrowed_sample *sample, bool update_retransmit_info) { struct whc_impl * const whc = (struct whc_impl *)whc_generic; os_mutexLock (&whc->lock); @@ -1312,18 +1312,17 @@ static void whc_return_sample (struct whc *whc_generic, struct whc_borrowed_samp os_mutexUnlock (&whc->lock); } -static void whc_sample_iter_init (const struct whc *whc_generic, struct whc_sample_iter *opaque_it) +static void whc_default_sample_iter_init (const struct whc *whc_generic, struct whc_sample_iter *opaque_it) { - const struct whc_impl * const whc = (const struct whc_impl *)whc_generic; - struct whc_sample_iter_impl *it = (struct whc_sample_iter_impl *)opaque_it->opaque.opaque; - it->whc = (struct whc_impl *)whc; + struct whc_sample_iter_impl *it = (struct whc_sample_iter_impl *)opaque_it; + it->c.whc = (struct whc *)whc_generic; it->first = true; } -static bool whc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample) +static bool whc_default_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample) { - struct whc_sample_iter_impl * const it = (struct whc_sample_iter_impl *)opaque_it->opaque.opaque; - struct whc_impl * const whc = it->whc; + struct whc_sample_iter_impl * const it = (struct whc_sample_iter_impl *)opaque_it; + struct whc_impl * const whc = (struct whc_impl *)it->c.whc; struct whc_node *whcn; struct whc_intvnode *intv; seqno_t seq; diff --git a/src/core/ddsi/CMakeLists.txt b/src/core/ddsi/CMakeLists.txt index d7707f2..c8d062b 100644 --- a/src/core/ddsi/CMakeLists.txt +++ b/src/core/ddsi/CMakeLists.txt @@ -54,6 +54,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src" q_time.c q_transmit.c q_inverse_uint32_set.c + q_whc.c q_xevent.c q_xmsg.c q_freelist.c diff --git a/src/core/ddsi/include/ddsi/q_whc.h b/src/core/ddsi/include/ddsi/q_whc.h index c9cfa55..1a99e65 100644 --- a/src/core/ddsi/include/ddsi/q_whc.h +++ b/src/core/ddsi/include/ddsi/q_whc.h @@ -42,8 +42,12 @@ struct whc_state { an iter on the stack without specifying an implementation. If future changes or implementations require more, these can be adjusted. An implementation should check things fit at compile time. */ -#define WHC_SAMPLE_ITER_SIZE (2*sizeof(void *)) +#define WHC_SAMPLE_ITER_SIZE (1*sizeof(void *)) +struct whc_sample_iter_base { + struct whc *whc; +}; struct whc_sample_iter { + struct whc_sample_iter_base c; union { char opaque[WHC_SAMPLE_ITER_SIZE]; /* cover alignment requirements: */ @@ -90,6 +94,43 @@ struct whc { const struct whc_ops *ops; }; +inline seqno_t whc_next_seq (const struct whc *whc, seqno_t seq) { + return whc->ops->next_seq (whc, seq); +} +inline void whc_get_state (const struct whc *whc, struct whc_state *st) { + whc->ops->get_state (whc, st); +} +inline bool whc_borrow_sample (const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample) { + return whc->ops->borrow_sample (whc, seq, sample); +} +inline bool whc_borrow_sample_key (const struct whc *whc, const struct ddsi_serdata *serdata_key, struct whc_borrowed_sample *sample) { + return whc->ops->borrow_sample_key (whc, serdata_key, sample); +} +inline void whc_return_sample (struct whc *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info) { + whc->ops->return_sample (whc, sample, update_retransmit_info); +} +inline void whc_sample_iter_init (const struct whc *whc, struct whc_sample_iter *it) { + whc->ops->sample_iter_init (whc, it); +} +inline bool whc_sample_iter_borrow_next (struct whc_sample_iter *it, struct whc_borrowed_sample *sample) { + return it->c.whc->ops->sample_iter_borrow_next (it, sample); +} +inline void whc_free (struct whc *whc) { + whc->ops->free (whc); +} +inline int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct tkmap_instance *tk) { + return whc->ops->insert (whc, max_drop_seq, seq, plist, serdata, tk); +} +inline unsigned whc_downgrade_to_volatile (struct whc *whc, struct whc_state *st) { + return whc->ops->downgrade_to_volatile (whc, st); +} +inline unsigned whc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list) { + return whc->ops->remove_acked_messages (whc, max_drop_seq, whcst, deferred_free_list); +} +inline void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list) { + whc->ops->free_deferred_free_list (whc, deferred_free_list); +} + #if defined (__cplusplus) } #endif diff --git a/src/core/ddsi/src/q_debmon.c b/src/core/ddsi/src/q_debmon.c index 4b1f839..6671cac 100644 --- a/src/core/ddsi/src/q_debmon.c +++ b/src/core/ddsi/src/q_debmon.c @@ -188,7 +188,7 @@ static int print_participants (struct thread_state1 *self, ddsi_tran_conn_t conn continue; os_mutexLock (&w->e.lock); print_endpoint_common (conn, "wr", &w->e, &w->c, w->xqos, w->topic); - w->whc->ops->get_state(w->whc, &whcst); + whc_get_state(w->whc, &whcst); x += cpf (conn, " whc [%lld,%lld] unacked %"PRIuSIZE"%s [%u,%u] seq %lld seq_xmit %lld cs_seq %lld\n", whcst.min_seq, whcst.max_seq, whcst.unacked_bytes, w->throttling ? " THROTTLING" : "", diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 0e61baf..d9d6de9 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -1314,7 +1314,7 @@ static void writer_drop_connection (const struct nn_guid * wr_guid, const struct } } os_mutexUnlock (&wr->e.lock); - wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list); + whc_free_deferred_free_list (wr->whc, deferred_free_list); free_wr_prd_match (m); } } @@ -1606,8 +1606,8 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd) { struct whc_sample_iter it; struct whc_borrowed_sample sample; - wr->whc->ops->sample_iter_init(wr->whc, &it); - while (wr->whc->ops->sample_iter_borrow_next(&it, &sample)) + whc_sample_iter_init(wr->whc, &it); + while (whc_sample_iter_borrow_next(&it, &sample)) { struct proxy_writer_info pwr_info; struct ddsi_serdata *payload = sample.serdata; @@ -2540,7 +2540,7 @@ unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, stru unsigned n; assert (wr->e.guid.entityid.u != NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER); ASSERT_MUTEX_HELD (&wr->e.lock); - n = wr->whc->ops->remove_acked_messages (wr->whc, writer_max_drop_seq (wr), whcst, deferred_free_list); + n = whc_remove_acked_messages (wr->whc, writer_max_drop_seq (wr), whcst, deferred_free_list); /* when transitioning from >= low-water to < low-water, signal anyone waiting in throttle_writer() */ if (wr->throttling && whcst->unacked_bytes <= wr->whc_low) @@ -2854,7 +2854,7 @@ static void gc_delete_writer (struct gcreq *gcreq) (wr->status_cb) (wr->status_cb_entity, NULL); } - wr->whc->ops->free (wr->whc); + whc_free (wr->whc); #ifdef DDSI_INCLUDE_SSM if (wr->ssm_as) unref_addrset (wr->ssm_as); @@ -2950,7 +2950,7 @@ int delete_writer (const struct nn_guid *guid) be the usual case), do it immediately. If more data is still coming in (which can't really happen at the moment, but might again in the future) it'll potentially be discarded. */ - wr->whc->ops->get_state(wr->whc, &whcst); + whc_get_state(wr->whc, &whcst); if (whcst.unacked_bytes == 0) { nn_log (LC_DISCOVERY, "delete_writer(guid %x:%x:%x:%x) - no unack'ed samples\n", PGUID (*guid)); @@ -2982,12 +2982,12 @@ void writer_exit_startup_mode (struct writer *wr) struct whc_state whcst; wr->startup_mode = 0; cnt += remove_acked_messages (wr, &whcst, &deferred_free_list); - cnt += wr->whc->ops->downgrade_to_volatile (wr->whc, &whcst); + cnt += whc_downgrade_to_volatile (wr->whc, &whcst); writer_clear_retransmitting (wr); nn_log (LC_DISCOVERY, " %x:%x:%x:%x: dropped %u samples\n", PGUID(wr->e.guid), cnt); } os_mutexUnlock (&wr->e.lock); - wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list); + whc_free_deferred_free_list (wr->whc, deferred_free_list); } uint64_t writer_instance_id (const struct nn_guid *guid) @@ -4316,7 +4316,7 @@ static void proxy_reader_set_delete_and_ack_all_messages (struct proxy_reader *p writer_clear_retransmitting (wr); } os_mutexUnlock (&wr->e.lock); - wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list); + whc_free_deferred_free_list (wr->whc, deferred_free_list); } wrguid = wrguid_next; diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 1fe232a..bf5846d 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -638,7 +638,7 @@ static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state * static seqno_t grow_gap_to_next_seq (const struct writer *wr, seqno_t seq) { - seqno_t next_seq = wr->whc->ops->next_seq (wr->whc, seq - 1); + seqno_t next_seq = whc_next_seq (wr->whc, seq - 1); seqno_t seq_xmit = READ_SEQ_XMIT(wr); if (next_seq == MAX_SEQ_NUMBER) /* no next sample */ return seq_xmit + 1; @@ -835,7 +835,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac else { /* There's actually no guarantee that we need this information */ - wr->whc->ops->get_state(wr->whc, &whcst); + whc_get_state(wr->whc, &whcst); } /* If this reader was marked as "non-responsive" in the past, it's now responding again, @@ -944,7 +944,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac { seqno_t seq = seqbase + i; struct whc_borrowed_sample sample; - if (wr->whc->ops->borrow_sample (wr->whc, seq, &sample)) + if (whc_borrow_sample (wr->whc, seq, &sample)) { if (!wr->retransmitting && sample.unacked) writer_set_retransmitting (wr); @@ -982,7 +982,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac } } - wr->whc->ops->return_sample(wr->whc, &sample, true); + whc_return_sample(wr->whc, &sample, true); } else if (gapstart == -1) { @@ -1076,7 +1076,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac TRACE ((")")); out: os_mutexUnlock (&wr->e.lock); - wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list); + whc_free_deferred_free_list (wr->whc, deferred_free_list); return 1; } @@ -1498,7 +1498,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N /* Resend the requested fragments if we still have the sample, send a Gap if we don't have them anymore. */ - if (wr->whc->ops->borrow_sample (wr->whc, seq, &sample)) + if (whc_borrow_sample (wr->whc, seq, &sample)) { const unsigned base = msg->fragmentNumberState.bitmap_base - 1; int enqueued = 1; @@ -1514,7 +1514,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N enqueued = qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0); } } - wr->whc->ops->return_sample (wr->whc, &sample, false); + whc_return_sample (wr->whc, &sample, false); } else { @@ -1541,7 +1541,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N to give the reader a chance to nack the rest and make sure hearbeats will go out at a reasonably high rate for a while */ struct whc_state whcst; - wr->whc->ops->get_state(wr->whc, &whcst); + whc_get_state(wr->whc, &whcst); force_heartbeat_to_peer (wr, &whcst, prd, 1); writer_hbcontrol_note_asyncwrite (wr, now_mt ()); } diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 3b87032..c6243b4 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -884,7 +884,7 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist if (!do_insert) res = 0; - else if ((insres = wr->whc->ops->insert (wr->whc, writer_max_drop_seq (wr), seq, plist, serdata, tk)) < 0) + else if ((insres = whc_insert (wr->whc, writer_max_drop_seq (wr), seq, plist, serdata, tk)) < 0) res = insres; else res = 1; @@ -893,7 +893,7 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) { struct whc_state whcst; - wr->whc->ops->get_state(wr->whc, &whcst); + whc_get_state(wr->whc, &whcst); if (WHCST_ISEMPTY(&whcst)) assert (wr->c.pp->builtins_deleted); } @@ -947,7 +947,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr) nn_mtime_t tnow = now_mt (); const nn_mtime_t abstimeout = add_duration_to_mtime (tnow, nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time)); struct whc_state whcst; - wr->whc->ops->get_state(wr->whc, &whcst); + whc_get_state(wr->whc, &whcst); { #ifndef NDEBUG @@ -992,7 +992,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr) thread_state_asleep (lookup_thread_state()); result = os_condTimedWait (&wr->throttle_cond, &wr->e.lock, &timeout); thread_state_awake (lookup_thread_state()); - wr->whc->ops->get_state(wr->whc, &whcst); + whc_get_state(wr->whc, &whcst); } if (result == os_resultTimeout) { @@ -1064,7 +1064,7 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p /* If WHC overfull, block. */ { struct whc_state whcst; - wr->whc->ops->get_state(wr->whc, &whcst); + whc_get_state(wr->whc, &whcst); if (whcst.unacked_bytes > wr->whc_high) { os_result ores; @@ -1119,7 +1119,7 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p { struct whc_state whcst; if (wr->heartbeat_xevent) - wr->whc->ops->get_state(wr->whc, &whcst); + whc_get_state(wr->whc, &whcst); /* Note the subtlety of enqueueing with the lock held but transmitting without holding the lock. Still working on diff --git a/src/core/ddsi/src/q_whc.c b/src/core/ddsi/src/q_whc.c new file mode 100644 index 0000000..dd7e95c --- /dev/null +++ b/src/core/ddsi/src/q_whc.c @@ -0,0 +1,28 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include "os/os.h" +#include "ddsi/q_rtps.h" +#include "ddsi/q_time.h" +#include "ddsi/q_whc.h" + +extern inline seqno_t whc_next_seq (const struct whc *whc, seqno_t seq); +extern inline void whc_get_state (const struct whc *whc, struct whc_state *st); +extern inline bool whc_borrow_sample (const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample); +extern inline bool whc_borrow_sample_key (const struct whc *whc, const struct ddsi_serdata *serdata_key, struct whc_borrowed_sample *sample); +extern inline void whc_return_sample (struct whc *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info); +extern inline void whc_sample_iter_init (const struct whc *whc, struct whc_sample_iter *it); +extern inline bool whc_sample_iter_borrow_next (struct whc_sample_iter *it, struct whc_borrowed_sample *sample); +extern inline void whc_free (struct whc *whc); +extern int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct tkmap_instance *tk); +extern unsigned whc_downgrade_to_volatile (struct whc *whc, struct whc_state *st); +extern unsigned whc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list); +extern void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list); diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index 97f7f6a..9a3729d 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -607,7 +607,7 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt assert (wr->reliable); os_mutexLock (&wr->e.lock); - wr->whc->ops->get_state(wr->whc, &whcst); + whc_get_state(wr->whc, &whcst); if (!writer_must_have_hb_scheduled (wr, &whcst)) { hbansreq = 1; /* just for trace */ @@ -1023,7 +1023,7 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e nn_xmsg_free (mpayload); os_mutexLock (&spdp_wr->e.lock); - if (spdp_wr->whc->ops->borrow_sample_key (spdp_wr->whc, sd, &sample)) + if (whc_borrow_sample_key (spdp_wr->whc, sd, &sample)) { /* Claiming it is new rather than a retransmit so that the rexmit limiting won't kick in. It is best-effort and therefore the @@ -1031,7 +1031,7 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e place anyway. Nor is it necessary to fiddle with heartbeat control stuff. */ enqueue_sample_wrlock_held (spdp_wr, sample.seq, sample.plist, sample.serdata, prd, 1); - spdp_wr->whc->ops->return_sample(spdp_wr->whc, &sample, false); + whc_return_sample(spdp_wr->whc, &sample, false); #ifndef NDEBUG sample_found = true; #endif