From 61d98b46a69b46c9ebaa8ea89259d552e379aa33 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Wed, 1 Aug 2018 08:37:49 +0200 Subject: [PATCH] abstract WHC and pull it out of core DDSI code Signed-off-by: Erik Boasson --- src/core/ddsc/.fileids | 1 + src/core/ddsc/CMakeLists.txt | 2 + src/core/ddsc/src/dds__types.h | 1 + src/core/ddsc/src/dds__whc.h | 27 + .../{ddsi/src/q_whc.c => ddsc/src/dds_whc.c} | 477 ++++++++++++++---- src/core/ddsc/src/dds_writer.c | 63 ++- src/core/ddsi/.fileids | 1 - src/core/ddsi/CMakeLists.txt | 1 - src/core/ddsi/include/ddsi/q_entity.h | 29 +- src/core/ddsi/include/ddsi/q_hbcontrol.h | 13 +- src/core/ddsi/include/ddsi/q_transmit.h | 3 +- src/core/ddsi/include/ddsi/q_whc.h | 139 +++-- src/core/ddsi/src/q_debmon.c | 9 +- src/core/ddsi/src/q_entity.c | 198 +++----- src/core/ddsi/src/q_init.c | 2 +- src/core/ddsi/src/q_receive.c | 104 ++-- src/core/ddsi/src/q_transmit.c | 93 ++-- src/core/ddsi/src/q_xevent.c | 39 +- 18 files changed, 733 insertions(+), 469 deletions(-) create mode 100644 src/core/ddsc/src/dds__whc.h rename src/core/{ddsi/src/q_whc.c => ddsc/src/dds_whc.c} (68%) diff --git a/src/core/ddsc/.fileids b/src/core/ddsc/.fileids index fad8ca9..89472e2 100644 --- a/src/core/ddsc/.fileids +++ b/src/core/ddsc/.fileids @@ -31,3 +31,4 @@ 69 src/dds_report.c 70 src/dds_builtin.c 72 src/dds_guardcond.c +73 src/dds_whc.c diff --git a/src/core/ddsc/CMakeLists.txt b/src/core/ddsc/CMakeLists.txt index e53f2d2..ab4740d 100644 --- a/src/core/ddsc/CMakeLists.txt +++ b/src/core/ddsc/CMakeLists.txt @@ -41,6 +41,7 @@ PREPEND(srcs_ddsc "${CMAKE_CURRENT_LIST_DIR}/src" dds_guardcond.c dds_subscriber.c dds_write.c + dds_whc.c ) PREPEND(hdrs_public_ddsc "$$" @@ -80,6 +81,7 @@ PREPEND(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src" dds__types.h dds__write.h dds__writer.h + dds__whc.h q__osplser.h ) diff --git a/src/core/ddsc/src/dds__types.h b/src/core/ddsc/src/dds__types.h index 4dca40f..b8700e5 100644 --- a/src/core/ddsc/src/dds__types.h +++ b/src/core/ddsc/src/dds__types.h @@ -190,6 +190,7 @@ typedef struct dds_writer struct nn_xpack * m_xp; struct writer * m_wr; os_mutex m_call_lock; + struct whc *m_whc; /* FIXME: ownership still with underlying DDSI writer (cos of DDSI built-in writers )*/ /* Status metrics */ diff --git a/src/core/ddsc/src/dds__whc.h b/src/core/ddsc/src/dds__whc.h new file mode 100644 index 0000000..0c5a9c2 --- /dev/null +++ b/src/core/ddsc/src/dds__whc.h @@ -0,0 +1,27 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef DDS__WHC_H +#define DDS__WHC_H + +#include "ddsi/q_whc.h" + +#if defined (__cplusplus) +extern "C" { +#endif + +struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth); + +#if defined (__cplusplus) +} +#endif + +#endif /* Q_WHC_H */ diff --git a/src/core/ddsi/src/q_whc.c b/src/core/ddsc/src/dds_whc.c similarity index 68% rename from src/core/ddsi/src/q_whc.c rename to src/core/ddsc/src/dds_whc.c index c8a162d..95383f1 100644 --- a/src/core/ddsi/src/q_whc.c +++ b/src/core/ddsc/src/dds_whc.c @@ -17,10 +17,95 @@ #include "ddsi/ddsi_ser.h" #include "ddsi/q_unused.h" #include "ddsi/q_config.h" -#include "ddsi/q_whc.h" #include "q__osplser.h" +#include "dds__whc.h" #include "dds__tkmap.h" +#include "util/ut_avl.h" +#include "util/ut_hopscotch.h" +#include "ddsi/q_time.h" +#include "ddsi/q_rtps.h" +#include "ddsi/q_freelist.h" + +#define USE_EHH 0 + +struct whc_node { + struct whc_node *next_seq; /* next in this interval */ + struct whc_node *prev_seq; /* prev in this interval */ + struct whc_idxnode *idxnode; /* NULL if not in index */ + unsigned idxnode_pos; /* index in idxnode.hist */ + seqno_t seq; + uint64_t total_bytes; /* cumulative number of bytes up to and including this node */ + size_t size; + struct nn_plist *plist; /* 0 if nothing special */ + unsigned unacked: 1; /* counted in whc::unacked_bytes iff 1 */ + unsigned borrowed: 1; /* at most one can borrow it at any time */ + nn_mtime_t last_rexmit_ts; + unsigned rexmit_count; + struct serdata *serdata; +}; + +struct whc_intvnode { + ut_avlNode_t avlnode; + seqno_t min; + seqno_t maxp1; + struct whc_node *first; /* linked list of seqs with contiguous sequence numbers [min,maxp1) */ + struct whc_node *last; /* valid iff first != NULL */ +}; + +struct whc_idxnode { + int64_t iid; + seqno_t prune_seq; + struct tkmap_instance *tk; + unsigned headidx; +#if __STDC_VERSION__ >= 199901L + struct whc_node *hist[]; +#else + struct whc_node *hist[1]; +#endif +}; + +#if USE_EHH +struct whc_seq_entry { + seqno_t seq; + struct whc_node *whcn; +}; +#endif + +struct whc_impl { + struct whc common; + os_mutex lock; + unsigned seq_size; + size_t unacked_bytes; + size_t sample_overhead; + uint64_t total_bytes; /* total number of bytes pushed in */ + unsigned is_transient_local: 1; + unsigned hdepth; /* 0 = unlimited */ + unsigned tldepth; /* 0 = disabled/unlimited (no need to maintain an index if KEEP_ALL <=> is_transient_local + tldepth=0) */ + unsigned idxdepth; /* = max(hdepth, tldepth) */ + seqno_t max_drop_seq; /* samples in whc with seq <= max_drop_seq => transient-local */ + struct whc_intvnode *open_intv; /* interval where next sample will go (usually) */ + struct whc_node *maxseq_node; /* NULL if empty; if not in open_intv, open_intv is empty */ + struct nn_freelist freelist; /* struct whc_node *; linked via whc_node::next_seq */ +#if USE_EHH + struct ut_ehh *seq_hash; +#else + struct ut_hh *seq_hash; +#endif + struct ut_hh *idx_hash; + ut_avlTree_t seq; +}; + +struct whc_sample_iter_impl { + struct whc_impl *whc; + bool first; +}; + +/* check that our definition of whc_sample_iter fits in the type that callers allocate */ +struct whc_sample_iter_sizecheck { + char fits_in_generic_type[sizeof(struct whc_sample_iter_impl) <= sizeof(struct whc_sample_iter) ? 1 : -1]; +}; + /* Avoiding all nn_log-related activities when LC_WHC is not set (and it hardly ever is, as it is not even included in "trace") saves a couple of % CPU on a high-rate publisher - that's worth @@ -47,14 +132,45 @@ static int trace_whc (const char *fmt, ...) * - cleaning up after ACKs has additional pruning stage for same case */ -static void insert_whcn_in_hash (struct whc *whc, struct whc_node *whcn); -static void whc_delete_one (struct whc *whc, struct whc_node *whcn); +static struct whc_node *whc_findseq (const struct whc_impl *whc, seqno_t seq); +static void insert_whcn_in_hash (struct whc_impl *whc, struct whc_node *whcn); +static void whc_delete_one (struct whc_impl *whc, struct whc_node *whcn); static int compare_seq (const void *va, const void *vb); -static unsigned whc_remove_acked_messages_full (struct whc *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list); +static unsigned whc_remove_acked_messages_full (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list); +static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *deferred_free_list); +static void get_state_locked(const struct whc_impl *whc, struct whc_state *st); + +static unsigned whc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list); +static void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list); +static void whc_get_state(const struct whc *whc, struct whc_state *st); +static int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, serdata_t serdata, struct tkmap_instance *tk); +static seqno_t whc_next_seq (const struct whc *whc, seqno_t seq); +static bool whc_borrow_sample (const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample); +static bool whc_borrow_sample_key (const struct whc *whc, const struct serdata *serdata_key, struct whc_borrowed_sample *sample); +static void whc_return_sample (struct whc *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info); +static unsigned whc_downgrade_to_volatile (struct whc *whc, struct whc_state *st); +static void whc_sample_iter_init (const struct whc *whc, struct whc_sample_iter *opaque_it); +static bool whc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample); +static void whc_free (struct whc *whc); static const ut_avlTreedef_t whc_seq_treedef = UT_AVL_TREEDEF_INITIALIZER (offsetof (struct whc_intvnode, avlnode), offsetof (struct whc_intvnode, min), compare_seq, 0); +static const struct whc_ops whc_ops = { + .insert = whc_insert, + .remove_acked_messages = whc_remove_acked_messages, + .free_deferred_free_list = whc_free_deferred_free_list, + .get_state = whc_get_state, + .next_seq = whc_next_seq, + .borrow_sample = whc_borrow_sample, + .borrow_sample_key = whc_borrow_sample_key, + .return_sample = whc_return_sample, + .sample_iter_init = whc_sample_iter_init, + .sample_iter_borrow_next = whc_sample_iter_borrow_next, + .downgrade_to_volatile = whc_downgrade_to_volatile, + .free = whc_free +}; + #if USE_EHH static uint32_t whc_seq_entry_hash (const void *vn) { @@ -111,7 +227,7 @@ static int compare_seq (const void *va, const void *vb) return (*a == *b) ? 0 : (*a < *b) ? -1 : 1; } -static struct whc_node *whc_findmax_procedurally (const struct whc *whc) +static struct whc_node *whc_findmax_procedurally (const struct whc_impl *whc) { if (whc->seq_size == 0) return NULL; @@ -128,7 +244,7 @@ static struct whc_node *whc_findmax_procedurally (const struct whc *whc) } } -static void check_whc (const struct whc *whc) +static void check_whc (const struct whc_impl *whc) { /* there's much more we can check, but it gets expensive quite quickly: all nodes but open_intv non-empty, non-overlapping and @@ -155,7 +271,7 @@ static void check_whc (const struct whc *whc) } assert (whc->maxseq_node == whc_findmax_procedurally (whc)); -#if 1 && !defined(NDEBUG) +#if !defined(NDEBUG) { struct whc_intvnode *firstintv; struct whc_node *cur; @@ -174,7 +290,7 @@ static void check_whc (const struct whc *whc) #endif } -static void insert_whcn_in_hash (struct whc *whc, struct whc_node *whcn) +static void insert_whcn_in_hash (struct whc_impl *whc, struct whc_node *whcn) { /* precondition: whcn is not in hash */ #if USE_EHH @@ -187,7 +303,7 @@ static void insert_whcn_in_hash (struct whc *whc, struct whc_node *whcn) #endif } -static void remove_whcn_from_hash (struct whc *whc, struct whc_node *whcn) +static void remove_whcn_from_hash (struct whc_impl *whc, struct whc_node *whcn) { /* precondition: whcn is in hash */ #if USE_EHH @@ -200,7 +316,7 @@ static void remove_whcn_from_hash (struct whc *whc, struct whc_node *whcn) #endif } -struct whc_node *whc_findseq (const struct whc *whc, seqno_t seq) +static struct whc_node *whc_findseq (const struct whc_impl *whc, seqno_t seq) { #if USE_EHH struct whc_seq_entry e = { .seq = seq }, *r; @@ -215,15 +331,36 @@ struct whc_node *whc_findseq (const struct whc *whc, seqno_t seq) #endif } - -struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth, size_t sample_overhead) +static struct whc_node *whc_findkey (const struct whc_impl *whc, const struct serdata *serdata_key) { - struct whc *whc; + union { + struct whc_idxnode idxn; + char pad[sizeof(struct whc_idxnode) + sizeof(struct whc_node *)]; + } template; + struct whc_idxnode *n; + check_whc (whc); + template.idxn.iid = dds_tkmap_lookup(gv.m_tkmap, serdata_key); + n = ut_hhLookup (whc->idx_hash, &template.idxn); + if (n == NULL) + return NULL; + else + { + assert (n->hist[n->headidx]); + return n->hist[n->headidx]; + } +} + +struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth) +{ + size_t sample_overhead = 80; /* INFO_TS, DATA (estimate), inline QoS */ + struct whc_impl *whc; struct whc_intvnode *intv; assert((hdepth == 0 || tldepth <= hdepth) || is_transient_local); whc = os_malloc (sizeof (*whc)); + whc->common.ops = &whc_ops; + os_mutexInit (&whc->lock); whc->is_transient_local = is_transient_local ? 1 : 0; whc->hdepth = hdepth; whc->tldepth = tldepth; @@ -257,10 +394,10 @@ struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth, nn_freelist_init (&whc->freelist, UINT32_MAX, offsetof (struct whc_node, next_seq)); check_whc (whc); - return whc; + return (struct whc *)whc; } -static void free_whc_node_contents (struct whc *whc, struct whc_node *whcn) +static void free_whc_node_contents (struct whc_node *whcn) { ddsi_serdata_unref (whcn->serdata); if (whcn->plist) { @@ -269,9 +406,10 @@ static void free_whc_node_contents (struct whc *whc, struct whc_node *whcn) } } -void whc_free (struct whc *whc) +void whc_free (struct whc *whc_generic) { /* Freeing stuff without regards for maintaining data structures */ + struct whc_impl * const whc = (struct whc_impl *)whc_generic; check_whc (whc); if (whc->idx_hash) @@ -292,7 +430,7 @@ void whc_free (struct whc *whc) OS_WARNING_MSVC_OFF(6001); whcn = whcn->prev_seq; OS_WARNING_MSVC_ON(6001); - free_whc_node_contents (whc, tmp); + free_whc_node_contents (tmp); os_free (tmp); } } @@ -305,45 +443,41 @@ OS_WARNING_MSVC_ON(6001); #else ut_hhFree (whc->seq_hash); #endif + os_mutexDestroy (&whc->lock); os_free (whc); } -int whc_empty (const struct whc *whc) +static void get_state_locked(const struct whc_impl *whc, struct whc_state *st) { - return whc->seq_size == 0; -} - -seqno_t whc_min_seq (const struct whc *whc) -{ - /* precond: whc not empty */ - const struct whc_intvnode *intv; - check_whc (whc); - assert (!whc_empty (whc)); - intv = ut_avlFindMin (&whc_seq_treedef, &whc->seq); - assert (intv); - /* not empty, open node may be anything but is (by definition) + if (whc->seq_size == 0) + { + st->min_seq = st->max_seq = -1; + st->unacked_bytes = 0; + } + else + { + const struct whc_intvnode *intv; + intv = ut_avlFindMin (&whc_seq_treedef, &whc->seq); + /* not empty, open node may be anything but is (by definition) findmax, and whc is claimed to be non-empty, so min interval can't be empty */ - assert (intv->maxp1 > intv->min); - return intv->min; + assert (intv->maxp1 > intv->min); + st->min_seq = intv->min; + st->max_seq = whc->maxseq_node->seq; + st->unacked_bytes = whc->unacked_bytes; + } } -struct whc_node *whc_findmax (const struct whc *whc) +static void whc_get_state(const struct whc *whc_generic, struct whc_state *st) { + const struct whc_impl * const whc = (const struct whc_impl *)whc_generic; + os_mutexLock ((struct os_mutex *)&whc->lock); check_whc (whc); - return (struct whc_node *) whc->maxseq_node; + get_state_locked(whc, st); + os_mutexUnlock ((struct os_mutex *)&whc->lock); } -seqno_t whc_max_seq (const struct whc *whc) -{ - /* precond: whc not empty */ - check_whc (whc); - assert (!whc_empty (whc)); - assert (whc->maxseq_node != NULL); - return whc->maxseq_node->seq; -} - -static struct whc_node *find_nextseq_intv (struct whc_intvnode **p_intv, const struct whc *whc, seqno_t seq) +static struct whc_node *find_nextseq_intv (struct whc_intvnode **p_intv, const struct whc_impl *whc, seqno_t seq) { struct whc_node *n; struct whc_intvnode *intv; @@ -385,25 +519,23 @@ static struct whc_node *find_nextseq_intv (struct whc_intvnode **p_intv, const s } } -seqno_t whc_next_seq (const struct whc *whc, seqno_t seq) +static seqno_t whc_next_seq (const struct whc *whc_generic, seqno_t seq) { + const struct whc_impl * const whc = (const struct whc_impl *)whc_generic; struct whc_node *n; struct whc_intvnode *intv; + seqno_t nseq; + os_mutexLock ((struct os_mutex *)&whc->lock); check_whc (whc); if ((n = find_nextseq_intv (&intv, whc, seq)) == NULL) - return MAX_SEQ_NUMBER; + nseq = MAX_SEQ_NUMBER; else - return n->seq; + nseq = n->seq; + os_mutexUnlock ((struct os_mutex *)&whc->lock); + return nseq; } -struct whc_node* whc_next_node(const struct whc *whc, seqno_t seq) -{ - struct whc_intvnode *intv; - check_whc (whc); - return find_nextseq_intv(&intv, whc, seq); -} - -static void delete_one_sample_from_idx (struct whc *whc, struct whc_node *whcn) +static void delete_one_sample_from_idx (struct whc_impl *whc, struct whc_node *whcn) { struct whc_idxnode * const idxn = whcn->idxnode; assert (idxn != NULL); @@ -426,7 +558,7 @@ static void delete_one_sample_from_idx (struct whc *whc, struct whc_node *whcn) whcn->idxnode = NULL; } -static void free_one_instance_from_idx (struct whc *whc, seqno_t max_drop_seq, struct whc_idxnode *idxn) +static void free_one_instance_from_idx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_idxnode *idxn) { unsigned i; for (i = 0; i < whc->idxdepth; i++) @@ -446,14 +578,14 @@ static void free_one_instance_from_idx (struct whc *whc, seqno_t max_drop_seq, s os_free(idxn); } -static void delete_one_instance_from_idx (struct whc *whc, seqno_t max_drop_seq, struct whc_idxnode *idxn) +static void delete_one_instance_from_idx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_idxnode *idxn) { if (!ut_hhRemove (whc->idx_hash, idxn)) assert (0); free_one_instance_from_idx (whc, max_drop_seq, idxn); } -static int whcn_in_tlidx (const struct whc *whc, const struct whc_idxnode *idxn, unsigned pos) +static int whcn_in_tlidx (const struct whc_impl *whc, const struct whc_idxnode *idxn, unsigned pos) { if (idxn == NULL) return 0; @@ -465,19 +597,24 @@ static int whcn_in_tlidx (const struct whc *whc, const struct whc_idxnode *idxn, } } -void whc_downgrade_to_volatile (struct whc *whc) +static unsigned whc_downgrade_to_volatile (struct whc *whc_generic, struct whc_state *st) { + struct whc_impl * const whc = (struct whc_impl *)whc_generic; seqno_t old_max_drop_seq; struct whc_node *deferred_free_list; + unsigned cnt; /* We only remove them from whc->tlidx: we don't remove them from whc->seq yet. That'll happen eventually. */ + os_mutexLock (&whc->lock); check_whc (whc); if (whc->idxdepth == 0) { /* if not maintaining an index at all, this is nonsense */ - return; + get_state_locked(whc, st); + os_mutexUnlock (&whc->lock); + return 0; } assert (!whc->is_transient_local); @@ -502,18 +639,21 @@ void whc_downgrade_to_volatile (struct whc *whc) them all. */ old_max_drop_seq = whc->max_drop_seq; whc->max_drop_seq = 0; - whc_remove_acked_messages_full (whc, old_max_drop_seq, &deferred_free_list); - whc_free_deferred_free_list (whc, deferred_free_list); + cnt = whc_remove_acked_messages_full (whc, old_max_drop_seq, &deferred_free_list); + whc_free_deferred_free_list (whc_generic, deferred_free_list); assert (whc->max_drop_seq == old_max_drop_seq); + get_state_locked(whc, st); + os_mutexUnlock (&whc->lock); + return cnt; } -static size_t whcn_size (const struct whc *whc, const struct whc_node *whcn) +static size_t whcn_size (const struct whc_impl *whc, const struct whc_node *whcn) { size_t sz = ddsi_serdata_size (whcn->serdata); return sz + ((sz + config.fragment_size - 1) / config.fragment_size) * whc->sample_overhead; } -static void whc_delete_one_intv (struct whc *whc, struct whc_intvnode **p_intv, struct whc_node **p_whcn) +static void whc_delete_one_intv (struct whc_impl *whc, struct whc_intvnode **p_intv, struct whc_node **p_whcn) { /* Removes *p_whcn, possibly deleting or splitting *p_intv, as the case may be. Does *NOT* update whc->seq_size. *p_intv must be @@ -612,7 +752,7 @@ static void whc_delete_one_intv (struct whc *whc, struct whc_intvnode **p_intv, } } -static void whc_delete_one (struct whc *whc, struct whc_node *whcn) +static void whc_delete_one (struct whc_impl *whc, struct whc_node *whcn) { struct whc_intvnode *intv; struct whc_node *whcn_tmp = whcn; @@ -624,11 +764,11 @@ static void whc_delete_one (struct whc *whc, struct whc_node *whcn) if (whcn_tmp->next_seq) whcn_tmp->next_seq->prev_seq = whcn_tmp->prev_seq; whcn_tmp->next_seq = NULL; - whc_free_deferred_free_list (whc, whcn_tmp); + free_deferred_free_list (whc, whcn_tmp); whc->seq_size--; } -void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list) +static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *deferred_free_list) { if (deferred_free_list) { @@ -637,7 +777,8 @@ void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_fre for (cur = deferred_free_list, last = NULL; cur; last = cur, cur = cur->next_seq) { n++; - free_whc_node_contents (whc, cur); + if (!cur->borrowed) + free_whc_node_contents (cur); } cur = nn_freelist_pushmany (&whc->freelist, deferred_free_list, last, n); while (cur) @@ -649,7 +790,13 @@ void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_fre } } -static unsigned whc_remove_acked_messages_noidx (struct whc *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list) +static void whc_free_deferred_free_list (struct whc *whc_generic, struct whc_node *deferred_free_list) +{ + struct whc_impl * const whc = (struct whc_impl *)whc_generic; + free_deferred_free_list(whc, deferred_free_list); +} + +static unsigned whc_remove_acked_messages_noidx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list) { struct whc_intvnode *intv; struct whc_node *whcn; @@ -725,7 +872,7 @@ static unsigned whc_remove_acked_messages_noidx (struct whc *whc, seqno_t max_dr return ndropped; } -static unsigned whc_remove_acked_messages_full (struct whc *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list) +static unsigned whc_remove_acked_messages_full (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list) { struct whc_intvnode *intv; struct whc_node *whcn; @@ -859,49 +1006,36 @@ static unsigned whc_remove_acked_messages_full (struct whc *whc, seqno_t max_dro return ndropped; } -unsigned whc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list) +static unsigned whc_remove_acked_messages (struct whc *whc_generic, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list) { + struct whc_impl * const whc = (struct whc_impl *)whc_generic; + unsigned cnt; + + os_mutexLock (&whc->lock); assert (max_drop_seq < MAX_SEQ_NUMBER); assert (max_drop_seq >= whc->max_drop_seq); - TRACE_WHC(("whc_remove_acked_messages(%p max_drop_seq %"PRId64")\n", (void *)whc, max_drop_seq)); - TRACE_WHC((" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %u tl %u\n", - whc_empty(whc) ? (seqno_t)-1 : whc_min_seq(whc), - whc_empty(whc) ? (seqno_t)-1 : whc_max_seq(whc), - whc->max_drop_seq, whc->hdepth, whc->tldepth)); + if (config.enabled_logcats & LC_WHC) + { + struct whc_state whcst; + get_state_locked(whc, &whcst); + TRACE_WHC(("whc_remove_acked_messages(%p max_drop_seq %"PRId64")\n", (void *)whc, max_drop_seq)); + TRACE_WHC((" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %u tl %u\n", + whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth)); + } check_whc (whc); if (whc->idxdepth == 0) - { - return whc_remove_acked_messages_noidx (whc, max_drop_seq, deferred_free_list); - } + cnt = whc_remove_acked_messages_noidx (whc, max_drop_seq, deferred_free_list); else - { - return whc_remove_acked_messages_full (whc, max_drop_seq, deferred_free_list); - } + cnt = whc_remove_acked_messages_full (whc, max_drop_seq, deferred_free_list); + get_state_locked(whc, whcst); + os_mutexUnlock (&whc->lock); + return cnt; } -struct whc_node *whc_findkey (const struct whc *whc, const struct serdata *serdata_key) -{ - union { - struct whc_idxnode idxn; - char pad[sizeof(struct whc_idxnode) + sizeof(struct whc_node *)]; - } template; - struct whc_idxnode *n; - check_whc (whc); - template.idxn.iid = dds_tkmap_lookup(gv.m_tkmap, serdata_key); - n = ut_hhLookup (whc->idx_hash, &template.idxn); - if (n == NULL) - return NULL; - else - { - assert (n->hist[n->headidx]); - return n->hist[n->headidx]; - } -} - -static struct whc_node *whc_insert_seq (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, serdata_t serdata) +static struct whc_node *whc_insert_seq (struct whc_impl *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, serdata_t serdata) { struct whc_node *newn = NULL; @@ -910,6 +1044,7 @@ static struct whc_node *whc_insert_seq (struct whc *whc, seqno_t max_drop_seq, s newn->seq = seq; newn->plist = plist; newn->unacked = (seq > max_drop_seq); + newn->borrowed = 0; newn->idxnode = NULL; /* initial state, may be changed */ newn->idxnode_pos = 0; newn->last_rexmit_ts.v = 0; @@ -961,21 +1096,27 @@ static struct whc_node *whc_insert_seq (struct whc *whc, seqno_t max_drop_seq, s return newn; } -int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, serdata_t serdata, struct tkmap_instance *tk) +static int whc_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, serdata_t serdata, struct tkmap_instance *tk) { + struct whc_impl * const whc = (struct whc_impl *)whc_generic; struct whc_node *newn = NULL; struct whc_idxnode *idxn; union { struct whc_idxnode idxn; char pad[sizeof(struct whc_idxnode) + sizeof(struct whc_node *)]; } template; + + os_mutexLock (&whc->lock); check_whc (whc); - TRACE_WHC(("whc_insert(%p max_drop_seq %"PRId64" seq %"PRId64" plist %p serdata %p:%x)\n", (void *)whc, max_drop_seq, seq, (void*)plist, (void*)serdata, *(unsigned *)serdata->v.keyhash.m_hash)); - TRACE_WHC((" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %u tl %u\n", - whc_empty(whc) ? (seqno_t)-1 : whc_min_seq(whc), - whc_empty(whc) ? (seqno_t)-1 : whc_max_seq(whc), - whc->max_drop_seq, whc->hdepth, whc->tldepth)); + if (config.enabled_logcats & LC_WHC) + { + struct whc_state whcst; + get_state_locked(whc, &whcst); + TRACE_WHC(("whc_insert(%p max_drop_seq %"PRId64" seq %"PRId64" plist %p serdata %p:%x)\n", (void *)whc, max_drop_seq, seq, (void*)plist, (void*)serdata, *(unsigned *)serdata->v.keyhash.m_hash)); + TRACE_WHC((" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %u tl %u\n", + whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth)); + } assert (max_drop_seq < MAX_SEQ_NUMBER); assert (max_drop_seq >= whc->max_drop_seq); @@ -983,7 +1124,7 @@ int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_pl /* Seq must be greater than what is currently stored. Usually it'll be the next sequence number, but if there are no readers temporarily, a gap may be among the possibilities */ - assert (whc_empty (whc) || seq > whc_max_seq (whc)); + assert (whc->seq_size == 0 || seq > whc->maxseq_node->seq); /* Always insert in seq admin */ newn = whc_insert_seq (whc, max_drop_seq, seq, plist, serdata); @@ -994,6 +1135,7 @@ int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_pl if (ddsi_serdata_is_empty(serdata) || whc->idxdepth == 0) { TRACE_WHC((" empty or no hist\n")); + os_mutexUnlock (&whc->lock); return 0; } @@ -1089,10 +1231,123 @@ int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_pl } TRACE_WHC(("\n")); } + os_mutexUnlock (&whc->lock); return 0; } -size_t whc_unacked_bytes (struct whc *whc) +static void make_borrowed_sample(struct whc_borrowed_sample *sample, struct whc_node *whcn) { - return whc->unacked_bytes; + assert(!whcn->borrowed); + whcn->borrowed = 1; + sample->seq = whcn->seq; + sample->plist = whcn->plist; + sample->serdata = whcn->serdata; + sample->unacked = whcn->unacked; + sample->rexmit_count = whcn->rexmit_count; + sample->last_rexmit_ts = whcn->last_rexmit_ts; +} + +static bool whc_borrow_sample (const struct whc *whc_generic, seqno_t seq, struct whc_borrowed_sample *sample) +{ + const struct whc_impl * const whc = (const struct whc_impl *)whc_generic; + struct whc_node *whcn; + bool found; + os_mutexLock ((os_mutex *)&whc->lock); + if ((whcn = whc_findseq(whc, seq)) == NULL) + found = false; + else + { + make_borrowed_sample(sample, whcn); + found = true; + } + os_mutexUnlock ((os_mutex *)&whc->lock); + return found; +} + +static bool whc_borrow_sample_key (const struct whc *whc_generic, const struct serdata *serdata_key, struct whc_borrowed_sample *sample) +{ + const struct whc_impl * const whc = (const struct whc_impl *)whc_generic; + struct whc_node *whcn; + bool found; + os_mutexLock ((os_mutex *)&whc->lock); + if ((whcn = whc_findkey(whc, serdata_key)) == NULL) + found = false; + else + { + make_borrowed_sample(sample, whcn); + found = true; + } + os_mutexUnlock ((os_mutex *)&whc->lock); + return found; +} + +static void return_sample_locked (struct whc_impl *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info) +{ + struct whc_node *whcn; + if ((whcn = whc_findseq (whc, sample->seq)) == NULL) + { + /* data no longer present in WHC - that means ownership for serdata, plist shifted to the borrowed copy and "returning" it really becomes "destroying" it */ + ddsi_serdata_unref (sample->serdata); + if (sample->plist) { + nn_plist_fini (sample->plist); + os_free (sample->plist); + } + } + else + { + assert(whcn->borrowed); + whcn->borrowed = 0; + if (update_retransmit_info) + { + whcn->rexmit_count = sample->rexmit_count; + whcn->last_rexmit_ts = sample->last_rexmit_ts; + } + } +} + +static void whc_return_sample (struct whc *whc_generic, struct whc_borrowed_sample *sample, bool update_retransmit_info) +{ + struct whc_impl * const whc = (struct whc_impl *)whc_generic; + os_mutexLock (&whc->lock); + return_sample_locked (whc, sample, update_retransmit_info); + os_mutexUnlock (&whc->lock); +} + +static void whc_sample_iter_init (const struct whc *whc_generic, struct whc_sample_iter *opaque_it) +{ + const struct whc_impl * const whc = (const struct whc_impl *)whc_generic; + struct whc_sample_iter_impl *it = (struct whc_sample_iter_impl *)opaque_it->opaque.opaque; + it->whc = (struct whc_impl *)whc; + it->first = true; +} + +static bool whc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample) +{ + struct whc_sample_iter_impl * const it = (struct whc_sample_iter_impl *)opaque_it->opaque.opaque; + struct whc_impl * const whc = it->whc; + struct whc_node *whcn; + struct whc_intvnode *intv; + seqno_t seq; + bool valid; + os_mutexLock (&whc->lock); + check_whc (whc); + if (!it->first) + { + seq = sample->seq; + return_sample_locked(whc, sample, false); + } + else + { + it->first = false; + seq = 0; + } + if ((whcn = find_nextseq_intv (&intv, whc, seq)) == NULL) + valid = false; + else + { + make_borrowed_sample(sample, whcn); + valid = true; + } + os_mutexUnlock (&whc->lock); + return valid; } diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index 5869cbd..44a8807 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -21,6 +21,7 @@ #include "dds__err.h" #include "dds__init.h" #include "dds__tkmap.h" +#include "dds__whc.h" #include "dds__report.h" #include "ddsc/ddsc_project.h" @@ -233,6 +234,8 @@ dds_writer_delete( assert(e); assert(thr); + /* FIXME: not freeing WHC here because it is owned by the DDSI entity */ + if (asleep) { thread_state_awake(thr); } @@ -338,6 +341,62 @@ dds_writer_qos_set( return ret; } +static struct whc *make_whc(const dds_qos_t *qos) +{ + bool startup_mode; + bool handle_as_transient_local; + unsigned hdepth, tldepth; + /* Startup mode causes the writer to treat data in its WHC as if + transient-local, for the first few seconds after startup of the + DDSI service. It is done for volatile reliable writers only + (which automatically excludes all builtin writers) or for all + writers except volatile best-effort & transient-local ones. + + Which one to use depends on whether merge policies are in effect + in durability. If yes, then durability will take care of all + transient & persistent data; if no, DDSI discovery usually takes + too long and this'll save you. + + Note: may still be cleared, if it turns out we are not maintaining + an index at all (e.g., volatile KEEP_ALL) */ + if (config.startup_mode_full) { + startup_mode = gv.startup_mode && + (qos->durability.kind >= DDS_DURABILITY_TRANSIENT || + (qos->durability.kind == DDS_DURABILITY_VOLATILE && + qos->reliability.kind != DDS_RELIABILITY_BEST_EFFORT)); + } else { + startup_mode = gv.startup_mode && + (qos->durability.kind == DDS_DURABILITY_VOLATILE && + qos->reliability.kind != DDS_RELIABILITY_BEST_EFFORT); + } + + /* Construct WHC -- if aggressive_keep_last1 is set, the WHC will + drop all samples for which a later update is available. This + forces it to maintain a tlidx. */ + handle_as_transient_local = (qos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL); + if (!config.aggressive_keep_last_whc || qos->history.kind == DDS_HISTORY_KEEP_ALL) + hdepth = 0; + else + hdepth = (unsigned)qos->history.depth; + if (handle_as_transient_local) { + if (qos->durability_service.history.kind == DDS_HISTORY_KEEP_ALL) + tldepth = 0; + else + tldepth = (unsigned)qos->durability_service.history.depth; + } else if (startup_mode) { + tldepth = (hdepth == 0) ? 1 : hdepth; + } else { + tldepth = 0; + } + if (hdepth == 0 && tldepth == 0) + { + /* no index at all - so no need to bother with startup mode */ + startup_mode = 0; + } + + return whc_new (handle_as_transient_local, hdepth, tldepth); +} + _Pre_satisfies_(((participant_or_publisher & DDS_ENTITY_KIND_MASK) == DDS_KIND_PUBLISHER) || \ ((participant_or_publisher & DDS_ENTITY_KIND_MASK) == DDS_KIND_PARTICIPANT)) @@ -426,6 +485,7 @@ dds_create_writer( wr->m_entity.m_deriver.set_qos = dds_writer_qos_set; wr->m_entity.m_deriver.validate_status = dds_writer_status_validate; wr->m_entity.m_deriver.get_instance_hdl = dds_writer_instance_hdl; + wr->m_whc = make_whc (wqos); /* Extra claim of this writer to make sure that the delete waits until DDSI * has deleted its writer as well. This can be known through the callback. */ @@ -439,8 +499,7 @@ dds_create_writer( if (asleep) { thread_state_awake(thr); } - wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_participant->m_guid, ((dds_topic*)tp)->m_stopic, - wqos, dds_writer_status_cb, wr); + wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_participant->m_guid, ((dds_topic*)tp)->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr); os_mutexLock(&pub->m_mutex); os_mutexLock(&tp->m_mutex); assert(wr->m_wr); diff --git a/src/core/ddsi/.fileids b/src/core/ddsi/.fileids index a765e76..812b062 100644 --- a/src/core/ddsi/.fileids +++ b/src/core/ddsi/.fileids @@ -33,7 +33,6 @@ 32 src/q_thread_inlines.c 33 src/q_time.c 34 src/q_transmit.c -35 src/q_whc.c 36 src/q_xevent.c 37 src/q_xmsg.c 38 src/q_freelist.c diff --git a/src/core/ddsi/CMakeLists.txt b/src/core/ddsi/CMakeLists.txt index e284eb0..95f0776 100644 --- a/src/core/ddsi/CMakeLists.txt +++ b/src/core/ddsi/CMakeLists.txt @@ -50,7 +50,6 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src" q_time.c q_transmit.c q_inverse_uint32_set.c - q_whc.c q_xevent.c q_xmsg.c q_freelist.c diff --git a/src/core/ddsi/include/ddsi/q_entity.h b/src/core/ddsi/include/ddsi/q_entity.h index 635f259..ed6b328 100644 --- a/src/core/ddsi/include/ddsi/q_entity.h +++ b/src/core/ddsi/include/ddsi/q_entity.h @@ -474,34 +474,15 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity GUID "ppguid". May return NULL if participant unknown or writer/reader already known. */ -struct writer * new_writer -( - struct nn_guid *wrguid, - const struct nn_guid *group_guid, - const struct nn_guid *ppguid, - const struct sertopic *topic, - const struct nn_xqos *xqos, - status_cb_t status_cb, - void * status_cb_arg -); +struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg); -struct reader * new_reader -( - struct nn_guid *rdguid, - const struct nn_guid *group_guid, - const struct nn_guid *ppguid, - const struct sertopic *topic, - const struct nn_xqos *xqos, - struct rhc * rhc, - status_cb_t status_cb, - void * status_cb_arg -); +struct reader * new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void * status_cb_arg); struct whc_node; -unsigned remove_acked_messages (struct writer *wr, struct whc_node **deferred_free_list); -unsigned remove_acked_messages_and_free (struct writer *wr); +struct whc_state; +unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, struct whc_node **deferred_free_list); seqno_t writer_max_drop_seq (const struct writer *wr); -int writer_must_have_hb_scheduled (const struct writer *wr); +int writer_must_have_hb_scheduled (const struct writer *wr, const struct whc_state *whcst); void writer_set_retransmitting (struct writer *wr); void writer_clear_retransmitting (struct writer *wr); diff --git a/src/core/ddsi/include/ddsi/q_hbcontrol.h b/src/core/ddsi/include/ddsi/q_hbcontrol.h index 72521d9..e171bfd 100644 --- a/src/core/ddsi/include/ddsi/q_hbcontrol.h +++ b/src/core/ddsi/include/ddsi/q_hbcontrol.h @@ -17,6 +17,7 @@ extern "C" { #endif struct writer; +struct whc_state; struct hbcontrol { nn_mtime_t t_of_last_write; @@ -28,12 +29,12 @@ struct hbcontrol { }; void writer_hbcontrol_init (struct hbcontrol *hbc); -int64_t writer_hbcontrol_intv (const struct writer *wr, nn_mtime_t tnow); -void writer_hbcontrol_note_asyncwrite (struct writer *wr, nn_mtime_t tnow); -int writer_hbcontrol_ack_required (const struct writer *wr, nn_mtime_t tnow); -struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, nn_mtime_t tnow, unsigned packetid, int *hbansreq); -int writer_hbcontrol_must_send (const struct writer *wr, nn_mtime_t tnow); -struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, nn_mtime_t tnow, int hbansreq, int issync); +int64_t writer_hbcontrol_intv (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow); +void writer_hbcontrol_note_asyncwrite (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow); +int writer_hbcontrol_ack_required (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow); +struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow, unsigned packetid, int *hbansreq); +int writer_hbcontrol_must_send (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow); +struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow, int hbansreq, int issync); #if defined (__cplusplus) } diff --git a/src/core/ddsi/include/ddsi/q_transmit.h b/src/core/ddsi/include/ddsi/q_transmit.h index 0070000..b3ba762 100644 --- a/src/core/ddsi/include/ddsi/q_transmit.h +++ b/src/core/ddsi/include/ddsi/q_transmit.h @@ -22,6 +22,7 @@ extern "C" { struct nn_xpack; struct nn_xmsg; struct writer; +struct whc_state; struct proxy_reader; struct serdata; struct tkmap_instance; @@ -41,7 +42,7 @@ int write_sample_nogc_notk (struct nn_xpack *xp, struct writer *wr, struct serda /* When calling the following functions, wr->lock must be held */ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew); int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct serdata *serdata, struct proxy_reader *prd, int isnew); -void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, int hbansreq, nn_entityid_t dst, int issync); +void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, nn_entityid_t dst, int issync); #if defined (__cplusplus) } diff --git a/src/core/ddsi/include/ddsi/q_whc.h b/src/core/ddsi/include/ddsi/q_whc.h index 8045087..7e7de47 100644 --- a/src/core/ddsi/include/ddsi/q_whc.h +++ b/src/core/ddsi/include/ddsi/q_whc.h @@ -12,108 +12,83 @@ #ifndef Q_WHC_H #define Q_WHC_H -#include "util/ut_avl.h" -#include "util/ut_hopscotch.h" -#include "ddsi/q_time.h" -#include "ddsi/q_rtps.h" -#include "ddsi/q_freelist.h" - #if defined (__cplusplus) extern "C" { #endif struct serdata; struct nn_plist; -struct whc_idxnode; +struct tkmap_instance; +struct whc_node; /* opaque, but currently used for deferred free lists */ +struct whc; -#define USE_EHH 0 - -struct whc_node { - struct whc_node *next_seq; /* next in this interval */ - struct whc_node *prev_seq; /* prev in this interval */ - struct whc_idxnode *idxnode; /* NULL if not in index */ - unsigned idxnode_pos; /* index in idxnode.hist */ +struct whc_borrowed_sample { seqno_t seq; - uint64_t total_bytes; /* cumulative number of bytes up to and including this node */ - size_t size; - struct nn_plist *plist; /* 0 if nothing special */ - unsigned unacked: 1; /* counted in whc::unacked_bytes iff 1 */ + struct serdata *serdata; + struct nn_plist *plist; + bool unacked; nn_mtime_t last_rexmit_ts; unsigned rexmit_count; - struct serdata *serdata; }; -struct whc_intvnode { - ut_avlNode_t avlnode; - seqno_t min; - seqno_t maxp1; - struct whc_node *first; /* linked list of seqs with contiguous sequence numbers [min,maxp1) */ - struct whc_node *last; /* valid iff first != NULL */ -}; - -struct whc_idxnode { - int64_t iid; - seqno_t prune_seq; - struct tkmap_instance *tk; - unsigned headidx; -#if __STDC_VERSION__ >= 199901L - struct whc_node *hist[]; -#else - struct whc_node *hist[1]; -#endif -}; - -#if USE_EHH -struct whc_seq_entry { - seqno_t seq; - struct whc_node *whcn; -}; -#endif - -struct whc { - unsigned seq_size; +struct whc_state { + seqno_t min_seq; /* -1 if WHC empty, else > 0 */ + seqno_t max_seq; /* -1 if WHC empty, else >= min_seq */ size_t unacked_bytes; - size_t sample_overhead; - uint64_t total_bytes; /* total number of bytes pushed in */ - unsigned is_transient_local: 1; - unsigned hdepth; /* 0 = unlimited */ - unsigned tldepth; /* 0 = disabled/unlimited (no need to maintain an index if KEEP_ALL <=> is_transient_local + tldepth=0) */ - unsigned idxdepth; /* = max(hdepth, tldepth) */ - seqno_t max_drop_seq; /* samples in whc with seq <= max_drop_seq => transient-local */ - struct whc_intvnode *open_intv; /* interval where next sample will go (usually) */ - struct whc_node *maxseq_node; /* NULL if empty; if not in open_intv, open_intv is empty */ - struct nn_freelist freelist; /* struct whc_node *; linked via whc_node::next_seq */ -#if USE_EHH - struct ut_ehh *seq_hash; -#else - struct ut_hh *seq_hash; -#endif - struct ut_hh *idx_hash; - ut_avlTree_t seq; +}; +#define WHCST_ISEMPTY(whcst) ((whcst)->max_seq == -1) + +/* Adjust SIZE and alignment stuff as needed: they are here simply so we can allocate + an iter on the stack without specifying an implementation. If future changes or + implementations require more, these can be adjusted. An implementation should check + things fit at compile time. */ +#define WHC_SAMPLE_ITER_SIZE (2*sizeof(void *)) +struct whc_sample_iter { + union { + char opaque[WHC_SAMPLE_ITER_SIZE]; + /* cover alignment requirements: */ + uint64_t x; + double y; + void *p; + } opaque; }; -struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth, size_t sample_overhead); -void whc_free (struct whc *whc); -int whc_empty (const struct whc *whc); -seqno_t whc_min_seq (const struct whc *whc); -seqno_t whc_max_seq (const struct whc *whc); -seqno_t whc_next_seq (const struct whc *whc, seqno_t seq); -size_t whc_unacked_bytes (struct whc *whc); - -struct whc_node *whc_findseq (const struct whc *whc, seqno_t seq); -struct whc_node *whc_findmax (const struct whc *whc); -struct whc_node *whc_findkey (const struct whc *whc, const struct serdata *serdata_key); - -struct whc_node *whc_next_node (const struct whc *whc, seqno_t seq); +typedef seqno_t (*whc_next_seq_t)(const struct whc *whc, seqno_t seq); +typedef void (*whc_get_state_t)(const struct whc *whc, struct whc_state *st); +typedef bool (*whc_borrow_sample_t)(const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample); +typedef bool (*whc_borrow_sample_key_t)(const struct whc *whc, const struct serdata *serdata_key, struct whc_borrowed_sample *sample); +typedef void (*whc_return_sample_t)(struct whc *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info); +typedef void (*whc_sample_iter_init_t)(const struct whc *whc, struct whc_sample_iter *it); +typedef bool (*whc_sample_iter_borrow_next_t)(struct whc_sample_iter *it, struct whc_borrowed_sample *sample); +typedef void (*whc_free_t)(struct whc *whc); /* min_seq is lowest sequence number that must be retained because of reliable readers that have not acknowledged all data */ /* max_drop_seq must go soon, it's way too ugly. */ /* plist may be NULL or os_malloc'd, WHC takes ownership of plist */ -int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct serdata *serdata, struct tkmap_instance *tk); -void whc_downgrade_to_volatile (struct whc *whc); -unsigned whc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list); -void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list); +typedef int (*whc_insert_t)(struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct serdata *serdata, struct tkmap_instance *tk); +typedef unsigned (*whc_downgrade_to_volatile_t)(struct whc *whc, struct whc_state *st); +typedef unsigned (*whc_remove_acked_messages_t)(struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list); +typedef void (*whc_free_deferred_free_list_t)(struct whc *whc, struct whc_node *deferred_free_list); + +struct whc_ops { + whc_insert_t insert; + whc_remove_acked_messages_t remove_acked_messages; + whc_free_deferred_free_list_t free_deferred_free_list; + whc_get_state_t get_state; + whc_next_seq_t next_seq; + whc_borrow_sample_t borrow_sample; + whc_borrow_sample_key_t borrow_sample_key; + whc_return_sample_t return_sample; + whc_sample_iter_init_t sample_iter_init; + whc_sample_iter_borrow_next_t sample_iter_borrow_next; + whc_downgrade_to_volatile_t downgrade_to_volatile; + whc_free_t free; +}; + +struct whc { + const struct whc_ops *ops; +}; #if defined (__cplusplus) } diff --git a/src/core/ddsi/src/q_debmon.c b/src/core/ddsi/src/q_debmon.c index 3005a7a..94f22bf 100644 --- a/src/core/ddsi/src/q_debmon.c +++ b/src/core/ddsi/src/q_debmon.c @@ -22,7 +22,6 @@ #include "ddsi/q_time.h" #include "ddsi/q_misc.h" #include "ddsi/q_log.h" -#include "ddsi/q_whc.h" #include "ddsi/q_plist.h" #include "q__osplser.h" #include "ddsi/q_ephash.h" @@ -39,7 +38,7 @@ #include "ddsi/ddsi_tcp.h" #include "ddsi/sysdeps.h" - +#include "dds__whc.h" struct plugin { debug_monitor_plugin_t fn; @@ -185,14 +184,14 @@ static int print_participants (struct thread_state1 *self, ddsi_tran_conn_t conn { ut_avlIter_t rdit; struct wr_prd_match *m; + struct whc_state whcst; if (w->c.pp != p) continue; os_mutexLock (&w->e.lock); print_endpoint_common (conn, "wr", &w->e, &w->c, w->xqos, w->topic); + w->whc->ops->get_state(w->whc, &whcst); x += cpf (conn, " whc [%lld,%lld] unacked %"PRIuSIZE"%s [%u,%u] seq %lld seq_xmit %lld cs_seq %lld\n", - whc_empty (w->whc) ? -1 : whc_min_seq (w->whc), - whc_empty (w->whc) ? -1 : whc_max_seq (w->whc), - whc_unacked_bytes (w->whc), + whcst.min_seq, whcst.max_seq, whcst.unacked_bytes, w->throttling ? " THROTTLING" : "", w->whc_low, w->whc_high, w->seq, READ_SEQ_XMIT(w), w->cs_seq); diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index c63ad23..933c540 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -22,7 +22,6 @@ #include "ddsi/q_misc.h" #include "ddsi/q_log.h" #include "util/ut_avl.h" -#include "ddsi/q_whc.h" #include "ddsi/q_plist.h" #include "ddsi/q_lease.h" #include "q__osplser.h" @@ -42,6 +41,7 @@ #include "ddsi/ddsi_mcgroup.h" #include "ddsi/sysdeps.h" +#include "dds__whc.h" struct deleted_participant { ut_avlNode_t avlnode; @@ -84,27 +84,8 @@ static const unsigned prismtech_builtin_writers_besmask = NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_WRITER | NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_WRITER; -static struct writer * new_writer_guid -( - const struct nn_guid *guid, - const struct nn_guid *group_guid, - struct participant *pp, - const struct sertopic *topic, - const struct nn_xqos *xqos, - status_cb_t status_cb, - void *status_cbarg -); -static struct reader * new_reader_guid -( - const struct nn_guid *guid, - const struct nn_guid *group_guid, - struct participant *pp, - const struct sertopic *topic, - const struct nn_xqos *xqos, - struct rhc *rhc, - status_cb_t status_cb, - void *status_cbarg -); +static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_cbarg); +static struct reader * new_reader_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct sertopic *topic, const struct nn_xqos *xqos, struct rhc *rhc, status_cb_t status_cb, void *status_cbarg); static struct participant *ref_participant (struct participant *pp, const struct nn_guid *guid_of_refing_entity); static void unref_participant (struct participant *pp, const struct nn_guid *guid_of_refing_entity); static void delete_proxy_group_locked (struct proxy_group *pgroup, nn_wctime_t timestamp, int isimplicit); @@ -466,7 +447,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS)) { subguid.entityid = to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, LAST_WR_PARAMS); + new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, whc_new(1, 1, 1), LAST_WR_PARAMS); /* But we need the as_disc address set for SPDP, because we need to send it to everyone regardless of the existence of readers. */ { @@ -489,23 +470,23 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS)) { subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS); + new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS); + new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS); + new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PARTICIPANT_WRITER; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PUBLISHER_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS); + new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_WRITER; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_SUBSCRIBER_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS); + new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_WRITER; } @@ -513,7 +494,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis { /* TODO: make this one configurable, we don't want all participants to publish all topics (or even just those that they use themselves) */ subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_TOPIC_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS); + new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_TOPIC_ANNOUNCER; } @@ -521,7 +502,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS)) { subguid.entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS); + new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER; } @@ -1311,17 +1292,18 @@ static void free_wr_rd_match (struct wr_rd_match *m) static void writer_drop_connection (const struct nn_guid * wr_guid, const struct proxy_reader * prd) { - struct whc_node *deferred_free_list; struct writer *wr; if ((wr = ephash_lookup_writer_guid (wr_guid)) != NULL) { + struct whc_node *deferred_free_list = NULL; struct wr_prd_match *m; os_mutexLock (&wr->e.lock); if ((m = ut_avlLookup (&wr_readers_treedef, &wr->readers, &prd->e.guid)) != NULL) { + struct whc_state whcst; ut_avlDelete (&wr_readers_treedef, &wr->readers, m); rebuild_writer_addrset (wr); - remove_acked_messages (wr, &deferred_free_list); + remove_acked_messages (wr, &whcst, &deferred_free_list); wr->num_reliable_readers -= m->is_reliable; if (wr->status_cb) { @@ -1333,6 +1315,7 @@ static void writer_drop_connection (const struct nn_guid * wr_guid, const struct } } os_mutexUnlock (&wr->e.lock); + wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list); free_wr_prd_match (m); } } @@ -1616,20 +1599,23 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd) ut_avlInsertIPath (&wr_local_readers_treedef, &wr->local_readers, m, &path); local_reader_ary_insert (&wr->rdary, rd); - /* Store available data into the late joining reader when it is reliable. */ - if ((rd->xqos->reliability.kind > NN_BEST_EFFORT_RELIABILITY_QOS) /* transient reader */ && - (!whc_empty(wr->whc)) /* data available */ ) + /* Store available data into the late joining reader when it is reliable (we don't do + historical data for best-effort data over the wire, so also not locally). + FIXME: should limit ourselves to what it is available because of durability history, + not writer history */ + if (rd->xqos->reliability.kind > NN_BEST_EFFORT_RELIABILITY_QOS && rd->xqos->durability.kind > NN_VOLATILE_DURABILITY_QOS) { - seqno_t seq = -1; - struct whc_node *n; - while ((n = whc_next_node(wr->whc, seq)) != NULL) + struct whc_sample_iter it; + struct whc_borrowed_sample sample; + wr->whc->ops->sample_iter_init(wr->whc, &it); + while (wr->whc->ops->sample_iter_borrow_next(&it, &sample)) { struct proxy_writer_info pwr_info; - serdata_t payload = n->serdata; + serdata_t payload = sample.serdata; + /* FIXME: whc has tk reference in its index nodes, which is what we really should be iterating over anyway, and so we don't really have to look them up anymore */ struct tkmap_instance *tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (payload); make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos); (void)(ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk); - seq = n->seq; } } @@ -1645,7 +1631,6 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd) data.handle = rd->e.iid; (wr->status_cb) (wr->status_cb_entity, &data); } - } static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr, nn_count_t *init_count) @@ -2502,9 +2487,9 @@ seqno_t writer_max_drop_seq (const struct writer *wr) return (n->min_seq == MAX_SEQ_NUMBER) ? wr->seq : n->min_seq; } -int writer_must_have_hb_scheduled (const struct writer *wr) +int writer_must_have_hb_scheduled (const struct writer *wr, const struct whc_state *whcst) { - if (ut_avlIsEmpty (&wr->readers) || whc_empty (wr->whc)) + if (ut_avlIsEmpty (&wr->readers) || whcst->max_seq < 0) { /* Can't transmit a valid heartbeat if there is no data; and it wouldn't actually be sent anywhere if there are no readers, so @@ -2528,7 +2513,7 @@ int writer_must_have_hb_scheduled (const struct writer *wr) requiring a non-empty whc_seq: if it is transient_local, whc_seq usually won't be empty even when all msgs have been ack'd. */ - return writer_max_drop_seq (wr) < whc_max_seq (wr->whc); + return writer_max_drop_seq (wr) < whcst->max_seq; } } @@ -2550,21 +2535,19 @@ void writer_clear_retransmitting (struct writer *wr) os_condBroadcast (&wr->throttle_cond); } -unsigned remove_acked_messages (struct writer *wr, struct whc_node **deferred_free_list) +unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, struct whc_node **deferred_free_list) { unsigned n; - size_t n_unacked; assert (wr->e.guid.entityid.u != NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER); ASSERT_MUTEX_HELD (&wr->e.lock); - n = whc_remove_acked_messages (wr->whc, writer_max_drop_seq (wr), deferred_free_list); + n = wr->whc->ops->remove_acked_messages (wr->whc, writer_max_drop_seq (wr), whcst, deferred_free_list); /* when transitioning from >= low-water to < low-water, signal anyone waiting in throttle_writer() */ - n_unacked = whc_unacked_bytes (wr->whc); - if (wr->throttling && n_unacked <= wr->whc_low) + if (wr->throttling && whcst->unacked_bytes <= wr->whc_low) os_condBroadcast (&wr->throttle_cond); - if (wr->retransmitting && whc_unacked_bytes (wr->whc) == 0) + if (wr->retransmitting && whcst->unacked_bytes == 0) writer_clear_retransmitting (wr); - if (wr->state == WRST_LINGERING && n_unacked == 0) + if (wr->state == WRST_LINGERING && whcst->unacked_bytes == 0) { nn_log (LC_DISCOVERY, "remove_acked_messages: deleting lingering writer %x:%x:%x:%x\n", PGUID (wr->e.guid)); delete_writer_nolinger_locked (wr); @@ -2572,26 +2555,8 @@ unsigned remove_acked_messages (struct writer *wr, struct whc_node **deferred_fr return n; } -unsigned remove_acked_messages_and_free (struct writer *wr) +static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void * status_entity) { - struct whc_node *deferred_free_list; - unsigned n = remove_acked_messages (wr, &deferred_free_list); - whc_free_deferred_free_list (wr->whc, deferred_free_list); - return n; -} - -static struct writer * new_writer_guid -( - const struct nn_guid *guid, - const struct nn_guid *group_guid, - struct participant *pp, - const struct sertopic *topic, - const struct nn_xqos *xqos, - status_cb_t status_cb, - void * status_entity -) -{ - const size_t sample_overhead = 80; /* INFO_TS + DATA (approximate figure) + inline QoS */ struct writer *wr; nn_mtime_t tnow = now_mt (); @@ -2782,46 +2747,21 @@ static struct writer * new_writer_guid } wr->lease_duration = T_NEVER; /* FIXME */ - /* Construct WHC -- if aggressive_keep_last1 is set, the WHC will - drop all samples for which a later update is available. This - forces it to maintain a tlidx. */ + wr->whc = whc; + if (wr->xqos->history.kind == NN_KEEP_LAST_HISTORY_QOS && wr->aggressive_keep_last) { - unsigned hdepth, tldepth; - if (!wr->aggressive_keep_last || wr->xqos->history.kind == NN_KEEP_ALL_HISTORY_QOS) - hdepth = 0; - else - hdepth = (unsigned)wr->xqos->history.depth; - if (wr->handle_as_transient_local) { - if (wr->xqos->durability_service.history.kind == NN_KEEP_ALL_HISTORY_QOS) - tldepth = 0; - else - tldepth = (unsigned)wr->xqos->durability_service.history.depth; - } else if (wr->startup_mode) { - tldepth = hdepth; - } else { - tldepth = 0; - } - if (hdepth == 0 && tldepth == 0) - { - /* no index at all - so no need to bother with startup mode */ - wr->startup_mode = 0; - } - wr->whc = whc_new (wr->handle_as_transient_local, hdepth, tldepth, sample_overhead); - if (hdepth > 0) - { - /* hdepth > 0 => "aggressive keep last", and in that case: why - bother blocking for a slow receiver when the entire point of - KEEP_LAST is to keep going (at least in a typical interpretation - of the spec. */ - wr->whc_low = wr->whc_high = INT32_MAX; - } - else - { - wr->whc_low = config.whc_lowwater_mark; - wr->whc_high = config.whc_init_highwater_mark.value; - } - assert (!is_builtin_entityid(wr->e.guid.entityid, ownvendorid) || (wr->whc_low == wr->whc_high && wr->whc_low == INT32_MAX)); + /* hdepth > 0 => "aggressive keep last", and in that case: why + bother blocking for a slow receiver when the entire point of + KEEP_LAST is to keep going (at least in a typical interpretation + of the spec. */ + wr->whc_low = wr->whc_high = INT32_MAX; } + else + { + wr->whc_low = config.whc_lowwater_mark; + wr->whc_high = config.whc_init_highwater_mark.value; + } + assert (!is_builtin_entityid(wr->e.guid.entityid, ownvendorid) || (wr->whc_low == wr->whc_high && wr->whc_low == INT32_MAX)); /* Connection admin */ ut_avlInit (&wr_readers_treedef, &wr->readers); @@ -2855,16 +2795,7 @@ static struct writer * new_writer_guid return wr; } -struct writer * new_writer -( - struct nn_guid *wrguid, - const struct nn_guid *group_guid, - const struct nn_guid *ppguid, - const struct sertopic *topic, - const struct nn_xqos *xqos, - status_cb_t status_cb, - void * status_cb_arg -) +struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg) { struct participant *pp; struct writer * wr; @@ -2882,7 +2813,7 @@ struct writer * new_writer wrguid->prefix = pp->e.guid.prefix; if (pp_allocate_entityid (&wrguid->entityid, entity_kind, pp) < 0) return NULL; - wr = new_writer_guid (wrguid, group_guid, pp, topic, xqos, status_cb, status_cb_arg); + wr = new_writer_guid (wrguid, group_guid, pp, topic, xqos, whc, status_cb, status_cb_arg); return wr; } @@ -2929,7 +2860,7 @@ static void gc_delete_writer (struct gcreq *gcreq) (wr->status_cb) (wr->status_cb_entity, NULL); } - whc_free (wr->whc); + wr->whc->ops->free (wr->whc); #ifdef DDSI_INCLUDE_SSM if (wr->ssm_as) unref_addrset (wr->ssm_as); @@ -3012,6 +2943,7 @@ int delete_writer_nolinger (const struct nn_guid *guid) int delete_writer (const struct nn_guid *guid) { struct writer *wr; + struct whc_state whcst; if ((wr = ephash_lookup_writer_guid (guid)) == NULL) { nn_log (LC_DISCOVERY, "delete_writer(guid %x:%x:%x:%x) - unknown guid\n", PGUID (*guid)); @@ -3024,7 +2956,8 @@ int delete_writer (const struct nn_guid *guid) be the usual case), do it immediately. If more data is still coming in (which can't really happen at the moment, but might again in the future) it'll potentially be discarded. */ - if (whc_unacked_bytes (wr->whc) == 0) + wr->whc->ops->get_state(wr->whc, &whcst); + if (whcst.unacked_bytes == 0) { nn_log (LC_DISCOVERY, "delete_writer(guid %x:%x:%x:%x) - no unack'ed samples\n", PGUID (*guid)); delete_writer_nolinger_locked (wr); @@ -3047,22 +2980,20 @@ int delete_writer (const struct nn_guid *guid) void writer_exit_startup_mode (struct writer *wr) { + struct whc_node *deferred_free_list = NULL; os_mutexLock (&wr->e.lock); - /* startup mode and handle_as_transient_local may not both be set */ - assert (!(wr->startup_mode && wr->handle_as_transient_local)); - if (!wr->startup_mode) - nn_log (LC_DISCOVERY, " wr %x:%x:%x:%x skipped\n", PGUID (wr->e.guid)); - else + if (wr->startup_mode) { - unsigned n; - assert (wr->e.guid.entityid.u != NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER); + unsigned cnt = 0; + struct whc_state whcst; wr->startup_mode = 0; - whc_downgrade_to_volatile (wr->whc); - n = remove_acked_messages_and_free (wr); + cnt += remove_acked_messages (wr, &whcst, &deferred_free_list); + cnt += wr->whc->ops->downgrade_to_volatile (wr->whc, &whcst); writer_clear_retransmitting (wr); - nn_log (LC_DISCOVERY, " wr %x:%x:%x:%x dropped %u entr%s\n", PGUID (wr->e.guid), n, n == 1 ? "y" : "ies"); + nn_log (LC_DISCOVERY, " %x:%x:%x:%x: dropped %u samples\n", PGUID(wr->e.guid), cnt); } os_mutexUnlock (&wr->e.lock); + wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list); } uint64_t writer_instance_id (const struct nn_guid *guid) @@ -4379,16 +4310,19 @@ static void proxy_reader_set_delete_and_ack_all_messages (struct proxy_reader *p os_mutexUnlock (&prd->e.lock); if ((wr = ephash_lookup_writer_guid (&wrguid)) != NULL) { + struct whc_node *deferred_free_list = NULL; struct wr_prd_match *m_wr; os_mutexLock (&wr->e.lock); if ((m_wr = ut_avlLookup (&wr_readers_treedef, &wr->readers, &prd->e.guid)) != NULL) { + struct whc_state whcst; m_wr->seq = MAX_SEQ_NUMBER; ut_avlAugmentUpdate (&wr_readers_treedef, m_wr); - remove_acked_messages_and_free (wr); + (void)remove_acked_messages (wr, &whcst, &deferred_free_list); writer_clear_retransmitting (wr); } os_mutexUnlock (&wr->e.lock); + wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list); } wrguid = wrguid_next; diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index 4c453d9..36ee8b4 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -38,7 +38,6 @@ #include "ddsi/q_ephash.h" #include "ddsi/q_lease.h" #include "ddsi/q_gc.h" -#include "ddsi/q_whc.h" #include "ddsi/q_entity.h" #include "ddsi/q_nwif.h" #include "ddsi/q_globals.h" @@ -59,6 +58,7 @@ #include "ddsi/ddsi_mcgroup.h" #include "dds__tkmap.h" +#include "dds__whc.h" static void add_peer_addresses (struct addrset *as, const struct config_peer_listelem *list) { diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 02e74b8..0988ce2 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -41,7 +41,6 @@ #include "ddsi/q_ephash.h" #include "ddsi/q_lease.h" #include "ddsi/q_gc.h" -#include "ddsi/q_whc.h" #include "ddsi/q_entity.h" #include "ddsi/q_xmsg.h" #include "ddsi/q_receive.h" @@ -52,6 +51,7 @@ #include "ddsi/ddsi_mcgroup.h" #include "ddsi/sysdeps.h" +#include "dds__whc.h" /* Notes: @@ -588,7 +588,7 @@ static int add_Gap (struct nn_xmsg *msg, struct writer *wr, struct proxy_reader return 0; } -static void force_heartbeat_to_peer (struct writer *wr, struct proxy_reader *prd, int hbansreq) +static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *whcst, struct proxy_reader *prd, int hbansreq) { struct nn_xmsg *m; @@ -603,7 +603,7 @@ static void force_heartbeat_to_peer (struct writer *wr, struct proxy_reader *prd return; } - if (whc_empty (wr->whc) && !config.respond_to_rti_init_zero_ack_with_invalid_heartbeat) + if (WHCST_ISEMPTY(whcst) && !config.respond_to_rti_init_zero_ack_with_invalid_heartbeat) { /* If WHC is empty, we send a Gap combined with a Heartbeat. The Gap reuses the latest sequence number (or consumes a new one if @@ -622,14 +622,14 @@ static void force_heartbeat_to_peer (struct writer *wr, struct proxy_reader *prd UPDATE_SEQ_XMIT_LOCKED(wr, 1); } add_Gap (m, wr, prd, seq, seq+1, 1, &bits); - add_Heartbeat (m, wr, hbansreq, prd->e.guid.entityid, 1); + add_Heartbeat (m, wr, whcst, hbansreq, prd->e.guid.entityid, 1); TRACE (("force_heartbeat_to_peer: %x:%x:%x:%x -> %x:%x:%x:%x - whc empty, queueing gap #%"PRId64" + heartbeat for transmit\n", PGUID (wr->e.guid), PGUID (prd->e.guid), seq)); } else { /* Send a Heartbeat just to this peer */ - add_Heartbeat (m, wr, hbansreq, prd->e.guid.entityid, 0); + add_Heartbeat (m, wr, whcst, hbansreq, prd->e.guid.entityid, 0); TRACE (("force_heartbeat_to_peer: %x:%x:%x:%x -> %x:%x:%x:%x - queue for transmit\n", PGUID (wr->e.guid), PGUID (prd->e.guid))); } @@ -638,7 +638,7 @@ static void force_heartbeat_to_peer (struct writer *wr, struct proxy_reader *prd static seqno_t grow_gap_to_next_seq (const struct writer *wr, seqno_t seq) { - seqno_t next_seq = whc_next_seq (wr->whc, seq - 1); + seqno_t next_seq = wr->whc->ops->next_seq (wr->whc, seq - 1); seqno_t seq_xmit = READ_SEQ_XMIT(wr); if (next_seq == MAX_SEQ_NUMBER) /* no next sample */ return seq_xmit + 1; @@ -715,6 +715,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac uint32_t msgs_sent, msgs_lost; seqno_t max_seq_in_reply; struct whc_node *deferred_free_list = NULL; + struct whc_state whcst; unsigned i; int hb_sent_in_response = 0; memset (gapbits, 0, sizeof (gapbits)); @@ -828,19 +829,21 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac rn->seq = wr->seq; } ut_avlAugmentUpdate (&wr_readers_treedef, rn); - n = remove_acked_messages (wr, &deferred_free_list); + n = remove_acked_messages (wr, &whcst, &deferred_free_list); TRACE ((" ACK%"PRId64" RM%u", n_ack, n)); } + else + { + /* There's actually no guarantee that we need this information */ + wr->whc->ops->get_state(wr->whc, &whcst); + } /* If this reader was marked as "non-responsive" in the past, it's now responding again, so update its status */ if (rn->seq == MAX_SEQ_NUMBER && prd->c.xqos->reliability.kind == NN_RELIABLE_RELIABILITY_QOS) { seqno_t oldest_seq; - if (whc_empty (wr->whc)) - oldest_seq = wr->seq; - else - oldest_seq = whc_max_seq (wr->whc); + oldest_seq = WHCST_ISEMPTY(&whcst) ? wr->seq : whcst.max_seq; rn->has_replied_to_hb = 1; /* was temporarily cleared to ensure heartbeats went out */ rn->seq = seqbase - 1; if (oldest_seq > rn->seq) { @@ -880,19 +883,19 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac data in our WHC, we start sending it regardless of whether the remote reader asked for it */ TRACE ((" preemptive-nack")); - if (whc_empty (wr->whc)) + if (WHCST_ISEMPTY(&whcst)) { TRACE ((" whc-empty ")); - force_heartbeat_to_peer (wr, prd, 0); + force_heartbeat_to_peer (wr, &whcst, prd, 0); hb_sent_in_response = 1; } else { TRACE ((" rebase ")); - force_heartbeat_to_peer (wr, prd, 0); + force_heartbeat_to_peer (wr, &whcst, prd, 0); hb_sent_in_response = 1; numbits = config.accelerate_rexmit_block_size; - seqbase = whc_min_seq (wr->whc); + seqbase = whcst.min_seq; } } else if (!rn->assumed_in_sync) @@ -940,25 +943,25 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac if (i >= msg->readerSNState.numbits || nn_bitset_isset (numbits, msg->readerSNState.bits, i)) { seqno_t seq = seqbase + i; - struct whc_node *whcn; - if ((whcn = whc_findseq (wr->whc, seq)) != NULL) + struct whc_borrowed_sample sample; + if (wr->whc->ops->borrow_sample (wr->whc, seq, &sample)) { - if (!wr->retransmitting && whcn->unacked) + if (!wr->retransmitting && sample.unacked) writer_set_retransmitting (wr); if (config.retransmit_merging != REXMIT_MERGE_NEVER && rn->assumed_in_sync) { /* send retransmit to all receivers, but skip if recently done */ nn_mtime_t tstamp = now_mt (); - if (tstamp.v > whcn->last_rexmit_ts.v + config.retransmit_merging_period) + if (tstamp.v > sample.last_rexmit_ts.v + config.retransmit_merging_period) { TRACE ((" RX%"PRId64, seqbase + i)); - enqueued = (enqueue_sample_wrlock_held (wr, seq, whcn->plist, whcn->serdata, NULL, 0) >= 0); + enqueued = (enqueue_sample_wrlock_held (wr, seq, sample.plist, sample.serdata, NULL, 0) >= 0); if (enqueued) { max_seq_in_reply = seqbase + i; msgs_sent++; - whcn->last_rexmit_ts = tstamp; + sample.last_rexmit_ts = tstamp; } } else @@ -970,14 +973,16 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac { /* no merging, send directed retransmit */ TRACE ((" RX%"PRId64"", seqbase + i)); - enqueued = (enqueue_sample_wrlock_held (wr, seq, whcn->plist, whcn->serdata, prd, 0) >= 0); + enqueued = (enqueue_sample_wrlock_held (wr, seq, sample.plist, sample.serdata, prd, 0) >= 0); if (enqueued) { max_seq_in_reply = seqbase + i; msgs_sent++; - whcn->rexmit_count++; + sample.rexmit_count++; } } + + wr->whc->ops->return_sample(wr->whc, &sample, true); } else if (gapstart == -1) { @@ -1054,7 +1059,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac if (msgs_sent && max_seq_in_reply < seq_xmit) { TRACE ((" rexmit#%"PRIu32" maxseq:%"PRId64"<%"PRId64"<=%"PRId64"", msgs_sent, max_seq_in_reply, seq_xmit, wr->seq)); - force_heartbeat_to_peer (wr, prd, 1); + force_heartbeat_to_peer (wr, &whcst, prd, 1); hb_sent_in_response = 1; /* The primary purpose of hbcontrol_note_asyncwrite is to ensure @@ -1062,16 +1067,16 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac gradually lowering rate. If we just got a request for a retransmit, and there is more to be retransmitted, surely the rate should be kept up for now */ - writer_hbcontrol_note_asyncwrite (wr, now_mt ()); + writer_hbcontrol_note_asyncwrite (wr, &whcst, now_mt ()); } /* If "final" flag not set, we must respond with a heartbeat. Do it now if we haven't done so already */ if (!(msg->smhdr.flags & ACKNACK_FLAG_FINAL) && !hb_sent_in_response) - force_heartbeat_to_peer (wr, prd, 0); + force_heartbeat_to_peer (wr, &whcst, prd, 0); TRACE ((")")); out: os_mutexUnlock (&wr->e.lock); - whc_free_deferred_free_list (wr->whc, deferred_free_list); + wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list); return 1; } @@ -1428,7 +1433,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N struct proxy_reader *prd; struct wr_prd_match *rn; struct writer *wr; - struct whc_node *whcn; + struct whc_borrowed_sample sample; nn_guid_t src, dst; nn_count_t *countp; seqno_t seq = fromSN (msg->writerSN); @@ -1493,7 +1498,25 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N /* Resend the requested fragments if we still have the sample, send a Gap if we don't have them anymore. */ - if ((whcn = whc_findseq (wr->whc, seq)) == NULL) + if (wr->whc->ops->borrow_sample (wr->whc, seq, &sample)) + { + const unsigned base = msg->fragmentNumberState.bitmap_base - 1; + int enqueued = 1; + TRACE ((" scheduling requested frags ...\n")); + for (i = 0; i < msg->fragmentNumberState.numbits && enqueued; i++) + { + if (nn_bitset_isset (msg->fragmentNumberState.numbits, msg->fragmentNumberState.bits, i)) + { + struct nn_xmsg *reply; + if (create_fragment_message (wr, seq, sample.plist, sample.serdata, base + i, prd, &reply, 0) < 0) + enqueued = 0; + else + enqueued = qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0); + } + } + wr->whc->ops->return_sample (wr->whc, &sample, false); + } + else { static unsigned zero = 0; struct nn_xmsg *m; @@ -1512,30 +1535,15 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N qxev_msg (wr->evq, m); } } - else - { - const unsigned base = msg->fragmentNumberState.bitmap_base - 1; - int enqueued = 1; - TRACE ((" scheduling requested frags ...\n")); - for (i = 0; i < msg->fragmentNumberState.numbits && enqueued; i++) - { - if (nn_bitset_isset (msg->fragmentNumberState.numbits, msg->fragmentNumberState.bits, i)) - { - struct nn_xmsg *reply; - if (create_fragment_message (wr, seq, whcn->plist, whcn->serdata, base + i, prd, &reply, 0) < 0) - enqueued = 0; - else - enqueued = qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0); - } - } - } if (seq < READ_SEQ_XMIT(wr)) { /* Not everything was retransmitted yet, so force a heartbeat out to give the reader a chance to nack the rest and make sure hearbeats will go out at a reasonably high rate for a while */ - force_heartbeat_to_peer (wr, prd, 1); - writer_hbcontrol_note_asyncwrite (wr, now_mt ()); + struct whc_state whcst; + wr->whc->ops->get_state(wr->whc, &whcst); + force_heartbeat_to_peer (wr, &whcst, prd, 1); + writer_hbcontrol_note_asyncwrite (wr, &whcst, now_mt ()); } out: diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 0479dd2..2245013 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -15,7 +15,6 @@ #include "os/os.h" #include "util/ut_avl.h" -#include "ddsi/q_whc.h" #include "ddsi/q_entity.h" #include "ddsi/q_addrset.h" #include "ddsi/q_xmsg.h" @@ -36,6 +35,7 @@ #include "ddsi/ddsi_ser.h" #include "ddsi/sysdeps.h" +#include "dds__whc.h" #if __STDC_VERSION__ >= 199901L #define POS_INFINITY_DOUBLE INFINITY @@ -84,7 +84,7 @@ static void writer_hbcontrol_note_hb (struct writer *wr, nn_mtime_t tnow, int an hbc->hbs_since_last_write++; } -int64_t writer_hbcontrol_intv (const struct writer *wr, UNUSED_ARG (nn_mtime_t tnow)) +int64_t writer_hbcontrol_intv (const struct writer *wr, const struct whc_state *whcst, UNUSED_ARG (nn_mtime_t tnow)) { struct hbcontrol const * const hbc = &wr->hbcontrol; int64_t ret = config.const_hb_intv_sched; @@ -97,7 +97,7 @@ int64_t writer_hbcontrol_intv (const struct writer *wr, UNUSED_ARG (nn_mtime_t t ret *= 2; } - n_unacked = whc_unacked_bytes (wr->whc); + n_unacked = whcst->unacked_bytes; if (n_unacked >= wr->whc_low + 3 * (wr->whc_high - wr->whc_low) / 4) ret /= 2; if (n_unacked >= wr->whc_low + (wr->whc_high - wr->whc_low) / 2) @@ -109,7 +109,7 @@ int64_t writer_hbcontrol_intv (const struct writer *wr, UNUSED_ARG (nn_mtime_t t return ret; } -void writer_hbcontrol_note_asyncwrite (struct writer *wr, nn_mtime_t tnow) +void writer_hbcontrol_note_asyncwrite (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow) { struct hbcontrol * const hbc = &wr->hbcontrol; nn_mtime_t tnext; @@ -131,13 +131,13 @@ void writer_hbcontrol_note_asyncwrite (struct writer *wr, nn_mtime_t tnow) } } -int writer_hbcontrol_must_send (const struct writer *wr, nn_mtime_t tnow /* monotonic */) +int writer_hbcontrol_must_send (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow /* monotonic */) { struct hbcontrol const * const hbc = &wr->hbcontrol; - return (tnow.v >= hbc->t_of_last_hb.v + writer_hbcontrol_intv (wr, tnow)); + return (tnow.v >= hbc->t_of_last_hb.v + writer_hbcontrol_intv (wr, whcst, tnow)); } -struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, nn_mtime_t tnow, int hbansreq, int issync) +struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow, int hbansreq, int issync) { struct nn_xmsg *msg; const nn_guid_t *prd_guid; @@ -200,7 +200,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, nn_mtime_t #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS nn_xmsg_setencoderid (msg, wr->partition_id); #endif - add_Heartbeat (msg, wr, hbansreq, to_entityid (NN_ENTITYID_UNKNOWN), issync); + add_Heartbeat (msg, wr, whcst, hbansreq, to_entityid (NN_ENTITYID_UNKNOWN), issync); } else { @@ -221,14 +221,14 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, nn_mtime_t #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS nn_xmsg_setencoderid (msg, wr->partition_id); #endif - add_Heartbeat (msg, wr, hbansreq, prd_guid->entityid, issync); + add_Heartbeat (msg, wr, whcst, hbansreq, prd_guid->entityid, issync); } writer_hbcontrol_note_hb (wr, tnow, hbansreq); return msg; } -static int writer_hbcontrol_ack_required_generic (const struct writer *wr, nn_mtime_t tlast, nn_mtime_t tnow, int piggyback) +static int writer_hbcontrol_ack_required_generic (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tlast, nn_mtime_t tnow, int piggyback) { struct hbcontrol const * const hbc = &wr->hbcontrol; const int64_t hb_intv_ack = config.const_hb_intv_sched; @@ -251,7 +251,7 @@ static int writer_hbcontrol_ack_required_generic (const struct writer *wr, nn_mt return 2; } - if (whc_unacked_bytes (wr->whc) >= wr->whc_low + (wr->whc_high - wr->whc_low) / 2) + if (whcst->unacked_bytes >= wr->whc_low + (wr->whc_high - wr->whc_low) / 2) { if (tnow.v >= hbc->t_of_last_ackhb.v + config.const_hb_intv_sched_min) return 2; @@ -262,13 +262,13 @@ static int writer_hbcontrol_ack_required_generic (const struct writer *wr, nn_mt return 0; } -int writer_hbcontrol_ack_required (const struct writer *wr, nn_mtime_t tnow) +int writer_hbcontrol_ack_required (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow) { struct hbcontrol const * const hbc = &wr->hbcontrol; - return writer_hbcontrol_ack_required_generic (wr, hbc->t_of_last_write, tnow, 0); + return writer_hbcontrol_ack_required_generic (wr, whcst, hbc->t_of_last_write, tnow, 0); } -struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, nn_mtime_t tnow, unsigned packetid, int *hbansreq) +struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow, unsigned packetid, int *hbansreq) { struct hbcontrol * const hbc = &wr->hbcontrol; unsigned last_packetid; @@ -284,20 +284,20 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, nn_mtime_t tnow, /* Update statistics, intervals, scheduling of heartbeat event, &c. -- there's no real difference between async and sync so we reuse the async version. */ - writer_hbcontrol_note_asyncwrite (wr, tnow); + writer_hbcontrol_note_asyncwrite (wr, whcst, tnow); - *hbansreq = writer_hbcontrol_ack_required_generic (wr, tlast, tnow, 1); + *hbansreq = writer_hbcontrol_ack_required_generic (wr, whcst, tlast, tnow, 1); if (*hbansreq >= 2) { /* So we force a heartbeat in - but we also rely on our caller to send the packet out */ - msg = writer_hbcontrol_create_heartbeat (wr, tnow, *hbansreq, 1); + msg = writer_hbcontrol_create_heartbeat (wr, whcst, tnow, *hbansreq, 1); } else if (last_packetid != packetid) { /* If we crossed a packet boundary since the previous write, piggyback a heartbeat, with *hbansreq determining whether or not an ACK is needed. We don't force the packet out either: this is just to ensure a regular flow of ACKs for cleaning up the WHC & for allowing readers to NACK missing samples. */ - msg = writer_hbcontrol_create_heartbeat (wr, tnow, *hbansreq, 1); + msg = writer_hbcontrol_create_heartbeat (wr, whcst, tnow, *hbansreq, 1); } else { *hbansreq = 0; msg = NULL; @@ -311,13 +311,13 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, nn_mtime_t tnow, (hbc->tsched.v == T_NEVER) ? POS_INFINITY_DOUBLE : (double) (hbc->tsched.v - tnow.v) / 1e9, ut_avlIsEmpty (&wr->readers) ? -1 : root_rdmatch (wr)->min_seq, ut_avlIsEmpty (&wr->readers) || root_rdmatch (wr)->all_have_replied_to_hb ? "" : "!", - whc_empty (wr->whc) ? -1 : whc_max_seq (wr->whc), READ_SEQ_XMIT(wr))); + whcst->max_seq, READ_SEQ_XMIT(wr))); } return msg; } -void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, int hbansreq, nn_entityid_t dst, int issync) +void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, nn_entityid_t dst, int issync) { struct nn_xmsg_marker sm_marker; Heartbeat_t * hb; @@ -343,7 +343,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, int hbansreq, nn_ent hb->readerId = nn_hton_entityid (dst); hb->writerId = nn_hton_entityid (wr->e.guid.entityid); - if (whc_empty (wr->whc)) + if (WHCST_ISEMPTY(whcst)) { /* Really don't have data. Fake one at the current wr->seq. We're not really allowed to generate heartbeats when the WHC is @@ -360,7 +360,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, int hbansreq, nn_ent else { seqno_t seq_xmit; - min = whc_min_seq (wr->whc); + min = whcst->min_seq; max = wr->seq; seq_xmit = READ_SEQ_XMIT(wr); assert (min <= max); @@ -669,7 +669,7 @@ static int must_skip_frag (const char *frags_to_skip, unsigned frag) } #endif -static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *wr, seqno_t seq, const struct nn_plist *plist, serdata_t serdata, struct proxy_reader *prd, int isnew, unsigned nfrags) +static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *wr, const struct whc_state *whcst, seqno_t seq, const struct nn_plist *plist, serdata_t serdata, struct proxy_reader *prd, int isnew, unsigned nfrags) { unsigned i; #if 0 @@ -714,8 +714,7 @@ static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer * struct nn_xmsg *msg = NULL; int hbansreq; os_mutexLock (&wr->e.lock); - msg = writer_hbcontrol_piggyback - (wr, ddsi_serdata_twrite (serdata), nn_xpack_packetid (xp), &hbansreq); + msg = writer_hbcontrol_piggyback (wr, whcst, ddsi_serdata_twrite (serdata), nn_xpack_packetid (xp), &hbansreq); os_mutexUnlock (&wr->e.lock); if (msg) { @@ -726,7 +725,7 @@ static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer * } } -static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr, seqno_t seq, const struct nn_plist *plist, serdata_t serdata, struct proxy_reader *prd, int isnew) +static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr, const struct whc_state *whcst, seqno_t seq, const struct nn_plist *plist, serdata_t serdata, struct proxy_reader *prd, int isnew) { /* on entry: &wr->e.lock held; on exit: lock no longer held */ struct nn_xmsg *fmsg; @@ -739,7 +738,7 @@ static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr, unsigned nfrags; os_mutexUnlock (&wr->e.lock); nfrags = (sz + config.fragment_size - 1) / config.fragment_size; - transmit_sample_lgmsg_unlocked (xp, wr, seq, plist, serdata, prd, isnew, nfrags); + transmit_sample_lgmsg_unlocked (xp, wr, whcst, seq, plist, serdata, prd, isnew, nfrags); return; } else if (create_fragment_message_simple (wr, seq, serdata, &fmsg) < 0) @@ -754,7 +753,7 @@ static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr, /* Note: wr->heartbeat_xevent != NULL <=> wr is reliable */ if (wr->heartbeat_xevent) - hmsg = writer_hbcontrol_piggyback (wr, ddsi_serdata_twrite (serdata), nn_xpack_packetid (xp), &hbansreq); + hmsg = writer_hbcontrol_piggyback (wr, whcst, ddsi_serdata_twrite (serdata), nn_xpack_packetid (xp), &hbansreq); else hmsg = NULL; @@ -855,7 +854,7 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist if (!do_insert) res = 0; - else if ((insres = whc_insert (wr->whc, writer_max_drop_seq (wr), seq, plist, serdata, tk)) < 0) + else if ((insres = wr->whc->ops->insert (wr->whc, writer_max_drop_seq (wr), seq, plist, serdata, tk)) < 0) res = insres; else res = 1; @@ -863,16 +862,18 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist #ifndef NDEBUG if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) { - if (whc_findmax (wr->whc) == NULL) + struct whc_state whcst; + wr->whc->ops->get_state(wr->whc, &whcst); + if (WHCST_ISEMPTY(&whcst)) assert (wr->c.pp->builtins_deleted); } #endif return res; } -static int writer_may_continue (const struct writer *wr) +static int writer_may_continue (const struct writer *wr, const struct whc_state *whcst) { - return (whc_unacked_bytes (wr->whc) <= wr->whc_low && !wr->retransmitting) || (wr->state != WRST_OPERATIONAL); + return (whcst->unacked_bytes <= wr->whc_low && !wr->retransmitting) || (wr->state != WRST_OPERATIONAL); } @@ -915,7 +916,8 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr) os_result result = os_resultSuccess; nn_mtime_t tnow = now_mt (); const nn_mtime_t abstimeout = add_duration_to_mtime (tnow, nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time)); - size_t n_unacked = whc_unacked_bytes (wr->whc); + struct whc_state whcst; + wr->whc->ops->get_state(wr->whc, &whcst); { nn_vendorid_t ownvendorid = MY_VENDOR_ID; @@ -925,7 +927,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr) assert (!is_builtin_entityid(wr->e.guid.entityid, ownvendorid)); } - nn_log (LC_THROTTLE, "writer %x:%x:%x:%x waiting for whc to shrink below low-water mark (whc %"PRIuSIZE" low=%u high=%u)\n", PGUID (wr->e.guid), n_unacked, wr->whc_low, wr->whc_high); + nn_log (LC_THROTTLE, "writer %x:%x:%x:%x waiting for whc to shrink below low-water mark (whc %"PRIuSIZE" low=%u high=%u)\n", PGUID (wr->e.guid), whcst.unacked_bytes, wr->whc_low, wr->whc_high); wr->throttling = 1; wr->throttle_count++; @@ -934,7 +936,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr) things the wrong way round ... */ if (xp) { - struct nn_xmsg *hbmsg = writer_hbcontrol_create_heartbeat (wr, tnow, 1, 1); + struct nn_xmsg *hbmsg = writer_hbcontrol_create_heartbeat (wr, &whcst, tnow, 1, 1); os_mutexUnlock (&wr->e.lock); if (hbmsg) { @@ -944,7 +946,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr) os_mutexLock (&wr->e.lock); } - while (gv.rtps_keepgoing && !writer_may_continue (wr)) + while (gv.rtps_keepgoing && !writer_may_continue (wr, &whcst)) { int64_t reltimeout; tnow = now_mt (); @@ -958,6 +960,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr) thread_state_asleep (lookup_thread_state()); result = os_condTimedWait (&wr->throttle_cond, &wr->e.lock, &timeout); thread_state_awake (lookup_thread_state()); + wr->whc->ops->get_state(wr->whc, &whcst); } if (result == os_resultTimeout) { @@ -972,8 +975,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr) os_condBroadcast (&wr->throttle_cond); } - n_unacked = whc_unacked_bytes (wr->whc); - nn_log (LC_THROTTLE, "writer %x:%x:%x:%x done waiting for whc to shrink below low-water mark (whc %"PRIuSIZE" low=%u high=%u)\n", PGUID (wr->e.guid), n_unacked, wr->whc_low, wr->whc_high); + nn_log (LC_THROTTLE, "writer %x:%x:%x:%x done waiting for whc to shrink below low-water mark (whc %"PRIuSIZE" low=%u high=%u)\n", PGUID (wr->e.guid), whcst.unacked_bytes, wr->whc_low, wr->whc_high); return result; } @@ -1028,8 +1030,9 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p /* If WHC overfull, block. */ { - size_t unacked_bytes = whc_unacked_bytes (wr->whc); - if (unacked_bytes > wr->whc_high) + struct whc_state whcst; + wr->whc->ops->get_state(wr->whc, &whcst); + if (whcst.unacked_bytes > wr->whc_high) { os_result ores; assert(gc_allowed); /* also see beginning of the function */ @@ -1038,7 +1041,7 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p else { maybe_grow_whc (wr); - if (unacked_bytes <= wr->whc_high) + if (whcst.unacked_bytes <= wr->whc_high) ores = os_resultSuccess; else ores = throttle_writer (xp, wr); @@ -1081,6 +1084,10 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p } else { + struct whc_state whcst; + if (wr->heartbeat_xevent) + wr->whc->ops->get_state(wr->whc, &whcst); + /* Note the subtlety of enqueueing with the lock held but transmitting without holding the lock. Still working on cleaning that up. */ @@ -1098,14 +1105,14 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p plist_copy = &plist_stk; nn_plist_copy (plist_copy, plist); } - transmit_sample_unlocks_wr (xp, wr, seq, plist_copy, serdata, NULL, 1); + transmit_sample_unlocks_wr (xp, wr, &whcst, seq, plist_copy, serdata, NULL, 1); if (plist_copy) nn_plist_fini (plist_copy); } else { if (wr->heartbeat_xevent) - writer_hbcontrol_note_asyncwrite (wr, tnow); + writer_hbcontrol_note_asyncwrite (wr, &whcst, tnow); enqueue_sample_wrlock_held (wr, seq, plist, serdata, NULL, 1); os_mutexUnlock (&wr->e.lock); } diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index 72c5581..e7a5aa9 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -21,7 +21,6 @@ #include "ddsi/q_log.h" #include "ddsi/q_addrset.h" #include "ddsi/q_xmsg.h" -#include "ddsi/q_whc.h" #include "ddsi/q_xevent.h" #include "ddsi/q_thread.h" #include "ddsi/q_config.h" @@ -39,6 +38,7 @@ #include "ddsi/q_xmsg.h" #include "q__osplser.h" #include "ddsi/ddsi_ser.h" +#include "dds__whc.h" #include "ddsi/sysdeps.h" @@ -596,6 +596,7 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt struct writer *wr; nn_mtime_t t_next; int hbansreq = 0; + struct whc_state whcst; if ((wr = ephash_lookup_writer_guid (&ev->u.heartbeat.wr_guid)) == NULL) { @@ -606,23 +607,24 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt assert (wr->reliable); os_mutexLock (&wr->e.lock); - if (!writer_must_have_hb_scheduled (wr)) + wr->whc->ops->get_state(wr->whc, &whcst); + if (!writer_must_have_hb_scheduled (wr, &whcst)) { hbansreq = 1; /* just for trace */ msg = NULL; /* Need not send it now, and no need to schedule it for the future */ t_next.v = T_NEVER; } - else if (!writer_hbcontrol_must_send (wr, tnow)) + else if (!writer_hbcontrol_must_send (wr, &whcst, tnow)) { hbansreq = 1; /* just for trace */ msg = NULL; - t_next.v = tnow.v + writer_hbcontrol_intv (wr, tnow); + t_next.v = tnow.v + writer_hbcontrol_intv (wr, &whcst, tnow); } else { - hbansreq = writer_hbcontrol_ack_required (wr, tnow); - msg = writer_hbcontrol_create_heartbeat (wr, tnow, hbansreq, 0); - t_next.v = tnow.v + writer_hbcontrol_intv (wr, tnow); + hbansreq = writer_hbcontrol_ack_required (wr, &whcst, tnow); + msg = writer_hbcontrol_create_heartbeat (wr, &whcst, tnow, hbansreq, 0); + t_next.v = tnow.v + writer_hbcontrol_intv (wr, &whcst, tnow); } TRACE (("heartbeat(wr %x:%x:%x:%x%s) %s, resched in %g s (min-ack %"PRId64"%s, avail-seq %"PRId64", xmit %"PRId64")\n", @@ -632,7 +634,7 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt (t_next.v == T_NEVER) ? POS_INFINITY_DOUBLE : (double)(t_next.v - tnow.v) / 1e9, ut_avlIsEmpty (&wr->readers) ? (seqno_t) -1 : ((struct wr_prd_match *) ut_avlRootNonEmpty (&wr_readers_treedef, &wr->readers))->min_seq, ut_avlIsEmpty (&wr->readers) || ((struct wr_prd_match *) ut_avlRootNonEmpty (&wr_readers_treedef, &wr->readers))->all_have_replied_to_hb ? "" : "!", - whc_empty (wr->whc) ? (seqno_t) -1 : whc_max_seq (wr->whc), READ_SEQ_XMIT(wr))); + whcst.max_seq, READ_SEQ_XMIT(wr))); resched_xevent_if_earlier (ev, t_next); wr->hbcontrol.tsched = t_next; os_mutexUnlock (&wr->e.lock); @@ -963,10 +965,13 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e struct participant *pp; struct proxy_reader *prd; struct writer *spdp_wr; - struct whc_node *whcn; + struct whc_borrowed_sample sample; serstate_t st; serdata_t sd; nn_guid_t kh; +#ifndef NDEBUG + bool sample_found; +#endif if ((pp = ephash_lookup_participant_guid (&ev->u.spdp.pp_guid)) == NULL) { @@ -1011,21 +1016,31 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e sd = ddsi_serstate_fix (st); os_mutexLock (&spdp_wr->e.lock); - if ((whcn = whc_findkey (spdp_wr->whc, sd)) != NULL) + if (spdp_wr->whc->ops->borrow_sample_key (spdp_wr->whc, sd, &sample)) { /* Claiming it is new rather than a retransmit so that the rexmit limiting won't kick in. It is best-effort and therefore the updating of the last transmitted sequence number won't take place anyway. Nor is it necessary to fiddle with heartbeat control stuff. */ - enqueue_sample_wrlock_held (spdp_wr, whcn->seq, whcn->plist, whcn->serdata, prd, 1); + enqueue_sample_wrlock_held (spdp_wr, sample.seq, sample.plist, sample.serdata, prd, 1); + spdp_wr->whc->ops->return_sample(spdp_wr->whc, &sample, false); +#ifndef NDEBUG + sample_found = true; +#endif } +#ifndef NDEBUG + else + { + sample_found = false; + } +#endif os_mutexUnlock (&spdp_wr->e.lock); ddsi_serdata_unref (sd); #ifndef NDEBUG - if (whcn == NULL) + if (!sample_found) { /* If undirected, it is pp->spdp_xevent, and that one must never run into an empty WHC unless it is already marked for deletion.