abstract WHC and pull it out of core DDSI code

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2018-08-01 08:37:49 +02:00
parent 37953f5c49
commit 61d98b46a6
18 changed files with 733 additions and 469 deletions

View file

@ -31,3 +31,4 @@
69 src/dds_report.c
70 src/dds_builtin.c
72 src/dds_guardcond.c
73 src/dds_whc.c

View file

@ -41,6 +41,7 @@ PREPEND(srcs_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
dds_guardcond.c
dds_subscriber.c
dds_write.c
dds_whc.c
)
PREPEND(hdrs_public_ddsc "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include/ddsc>$<INSTALL_INTERFACE:include/ddsc>"
@ -80,6 +81,7 @@ PREPEND(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
dds__types.h
dds__write.h
dds__writer.h
dds__whc.h
q__osplser.h
)

View file

@ -190,6 +190,7 @@ typedef struct dds_writer
struct nn_xpack * m_xp;
struct writer * m_wr;
os_mutex m_call_lock;
struct whc *m_whc; /* FIXME: ownership still with underlying DDSI writer (cos of DDSI built-in writers )*/
/* Status metrics */

View file

@ -0,0 +1,27 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDS__WHC_H
#define DDS__WHC_H
#include "ddsi/q_whc.h"
#if defined (__cplusplus)
extern "C" {
#endif
struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth);
#if defined (__cplusplus)
}
#endif
#endif /* Q_WHC_H */

View file

@ -17,10 +17,95 @@
#include "ddsi/ddsi_ser.h"
#include "ddsi/q_unused.h"
#include "ddsi/q_config.h"
#include "ddsi/q_whc.h"
#include "q__osplser.h"
#include "dds__whc.h"
#include "dds__tkmap.h"
#include "util/ut_avl.h"
#include "util/ut_hopscotch.h"
#include "ddsi/q_time.h"
#include "ddsi/q_rtps.h"
#include "ddsi/q_freelist.h"
#define USE_EHH 0
struct whc_node {
struct whc_node *next_seq; /* next in this interval */
struct whc_node *prev_seq; /* prev in this interval */
struct whc_idxnode *idxnode; /* NULL if not in index */
unsigned idxnode_pos; /* index in idxnode.hist */
seqno_t seq;
uint64_t total_bytes; /* cumulative number of bytes up to and including this node */
size_t size;
struct nn_plist *plist; /* 0 if nothing special */
unsigned unacked: 1; /* counted in whc::unacked_bytes iff 1 */
unsigned borrowed: 1; /* at most one can borrow it at any time */
nn_mtime_t last_rexmit_ts;
unsigned rexmit_count;
struct serdata *serdata;
};
struct whc_intvnode {
ut_avlNode_t avlnode;
seqno_t min;
seqno_t maxp1;
struct whc_node *first; /* linked list of seqs with contiguous sequence numbers [min,maxp1) */
struct whc_node *last; /* valid iff first != NULL */
};
struct whc_idxnode {
int64_t iid;
seqno_t prune_seq;
struct tkmap_instance *tk;
unsigned headidx;
#if __STDC_VERSION__ >= 199901L
struct whc_node *hist[];
#else
struct whc_node *hist[1];
#endif
};
#if USE_EHH
struct whc_seq_entry {
seqno_t seq;
struct whc_node *whcn;
};
#endif
struct whc_impl {
struct whc common;
os_mutex lock;
unsigned seq_size;
size_t unacked_bytes;
size_t sample_overhead;
uint64_t total_bytes; /* total number of bytes pushed in */
unsigned is_transient_local: 1;
unsigned hdepth; /* 0 = unlimited */
unsigned tldepth; /* 0 = disabled/unlimited (no need to maintain an index if KEEP_ALL <=> is_transient_local + tldepth=0) */
unsigned idxdepth; /* = max(hdepth, tldepth) */
seqno_t max_drop_seq; /* samples in whc with seq <= max_drop_seq => transient-local */
struct whc_intvnode *open_intv; /* interval where next sample will go (usually) */
struct whc_node *maxseq_node; /* NULL if empty; if not in open_intv, open_intv is empty */
struct nn_freelist freelist; /* struct whc_node *; linked via whc_node::next_seq */
#if USE_EHH
struct ut_ehh *seq_hash;
#else
struct ut_hh *seq_hash;
#endif
struct ut_hh *idx_hash;
ut_avlTree_t seq;
};
struct whc_sample_iter_impl {
struct whc_impl *whc;
bool first;
};
/* check that our definition of whc_sample_iter fits in the type that callers allocate */
struct whc_sample_iter_sizecheck {
char fits_in_generic_type[sizeof(struct whc_sample_iter_impl) <= sizeof(struct whc_sample_iter) ? 1 : -1];
};
/* Avoiding all nn_log-related activities when LC_WHC is not set
(and it hardly ever is, as it is not even included in "trace")
saves a couple of % CPU on a high-rate publisher - that's worth
@ -47,14 +132,45 @@ static int trace_whc (const char *fmt, ...)
* - cleaning up after ACKs has additional pruning stage for same case
*/
static void insert_whcn_in_hash (struct whc *whc, struct whc_node *whcn);
static void whc_delete_one (struct whc *whc, struct whc_node *whcn);
static struct whc_node *whc_findseq (const struct whc_impl *whc, seqno_t seq);
static void insert_whcn_in_hash (struct whc_impl *whc, struct whc_node *whcn);
static void whc_delete_one (struct whc_impl *whc, struct whc_node *whcn);
static int compare_seq (const void *va, const void *vb);
static unsigned whc_remove_acked_messages_full (struct whc *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list);
static unsigned whc_remove_acked_messages_full (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list);
static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *deferred_free_list);
static void get_state_locked(const struct whc_impl *whc, struct whc_state *st);
static unsigned whc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list);
static void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list);
static void whc_get_state(const struct whc *whc, struct whc_state *st);
static int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, serdata_t serdata, struct tkmap_instance *tk);
static seqno_t whc_next_seq (const struct whc *whc, seqno_t seq);
static bool whc_borrow_sample (const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample);
static bool whc_borrow_sample_key (const struct whc *whc, const struct serdata *serdata_key, struct whc_borrowed_sample *sample);
static void whc_return_sample (struct whc *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info);
static unsigned whc_downgrade_to_volatile (struct whc *whc, struct whc_state *st);
static void whc_sample_iter_init (const struct whc *whc, struct whc_sample_iter *opaque_it);
static bool whc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample);
static void whc_free (struct whc *whc);
static const ut_avlTreedef_t whc_seq_treedef =
UT_AVL_TREEDEF_INITIALIZER (offsetof (struct whc_intvnode, avlnode), offsetof (struct whc_intvnode, min), compare_seq, 0);
static const struct whc_ops whc_ops = {
.insert = whc_insert,
.remove_acked_messages = whc_remove_acked_messages,
.free_deferred_free_list = whc_free_deferred_free_list,
.get_state = whc_get_state,
.next_seq = whc_next_seq,
.borrow_sample = whc_borrow_sample,
.borrow_sample_key = whc_borrow_sample_key,
.return_sample = whc_return_sample,
.sample_iter_init = whc_sample_iter_init,
.sample_iter_borrow_next = whc_sample_iter_borrow_next,
.downgrade_to_volatile = whc_downgrade_to_volatile,
.free = whc_free
};
#if USE_EHH
static uint32_t whc_seq_entry_hash (const void *vn)
{
@ -111,7 +227,7 @@ static int compare_seq (const void *va, const void *vb)
return (*a == *b) ? 0 : (*a < *b) ? -1 : 1;
}
static struct whc_node *whc_findmax_procedurally (const struct whc *whc)
static struct whc_node *whc_findmax_procedurally (const struct whc_impl *whc)
{
if (whc->seq_size == 0)
return NULL;
@ -128,7 +244,7 @@ static struct whc_node *whc_findmax_procedurally (const struct whc *whc)
}
}
static void check_whc (const struct whc *whc)
static void check_whc (const struct whc_impl *whc)
{
/* there's much more we can check, but it gets expensive quite
quickly: all nodes but open_intv non-empty, non-overlapping and
@ -155,7 +271,7 @@ static void check_whc (const struct whc *whc)
}
assert (whc->maxseq_node == whc_findmax_procedurally (whc));
#if 1 && !defined(NDEBUG)
#if !defined(NDEBUG)
{
struct whc_intvnode *firstintv;
struct whc_node *cur;
@ -174,7 +290,7 @@ static void check_whc (const struct whc *whc)
#endif
}
static void insert_whcn_in_hash (struct whc *whc, struct whc_node *whcn)
static void insert_whcn_in_hash (struct whc_impl *whc, struct whc_node *whcn)
{
/* precondition: whcn is not in hash */
#if USE_EHH
@ -187,7 +303,7 @@ static void insert_whcn_in_hash (struct whc *whc, struct whc_node *whcn)
#endif
}
static void remove_whcn_from_hash (struct whc *whc, struct whc_node *whcn)
static void remove_whcn_from_hash (struct whc_impl *whc, struct whc_node *whcn)
{
/* precondition: whcn is in hash */
#if USE_EHH
@ -200,7 +316,7 @@ static void remove_whcn_from_hash (struct whc *whc, struct whc_node *whcn)
#endif
}
struct whc_node *whc_findseq (const struct whc *whc, seqno_t seq)
static struct whc_node *whc_findseq (const struct whc_impl *whc, seqno_t seq)
{
#if USE_EHH
struct whc_seq_entry e = { .seq = seq }, *r;
@ -215,15 +331,36 @@ struct whc_node *whc_findseq (const struct whc *whc, seqno_t seq)
#endif
}
struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth, size_t sample_overhead)
static struct whc_node *whc_findkey (const struct whc_impl *whc, const struct serdata *serdata_key)
{
struct whc *whc;
union {
struct whc_idxnode idxn;
char pad[sizeof(struct whc_idxnode) + sizeof(struct whc_node *)];
} template;
struct whc_idxnode *n;
check_whc (whc);
template.idxn.iid = dds_tkmap_lookup(gv.m_tkmap, serdata_key);
n = ut_hhLookup (whc->idx_hash, &template.idxn);
if (n == NULL)
return NULL;
else
{
assert (n->hist[n->headidx]);
return n->hist[n->headidx];
}
}
struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth)
{
size_t sample_overhead = 80; /* INFO_TS, DATA (estimate), inline QoS */
struct whc_impl *whc;
struct whc_intvnode *intv;
assert((hdepth == 0 || tldepth <= hdepth) || is_transient_local);
whc = os_malloc (sizeof (*whc));
whc->common.ops = &whc_ops;
os_mutexInit (&whc->lock);
whc->is_transient_local = is_transient_local ? 1 : 0;
whc->hdepth = hdepth;
whc->tldepth = tldepth;
@ -257,10 +394,10 @@ struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth,
nn_freelist_init (&whc->freelist, UINT32_MAX, offsetof (struct whc_node, next_seq));
check_whc (whc);
return whc;
return (struct whc *)whc;
}
static void free_whc_node_contents (struct whc *whc, struct whc_node *whcn)
static void free_whc_node_contents (struct whc_node *whcn)
{
ddsi_serdata_unref (whcn->serdata);
if (whcn->plist) {
@ -269,9 +406,10 @@ static void free_whc_node_contents (struct whc *whc, struct whc_node *whcn)
}
}
void whc_free (struct whc *whc)
void whc_free (struct whc *whc_generic)
{
/* Freeing stuff without regards for maintaining data structures */
struct whc_impl * const whc = (struct whc_impl *)whc_generic;
check_whc (whc);
if (whc->idx_hash)
@ -292,7 +430,7 @@ void whc_free (struct whc *whc)
OS_WARNING_MSVC_OFF(6001);
whcn = whcn->prev_seq;
OS_WARNING_MSVC_ON(6001);
free_whc_node_contents (whc, tmp);
free_whc_node_contents (tmp);
os_free (tmp);
}
}
@ -305,45 +443,41 @@ OS_WARNING_MSVC_ON(6001);
#else
ut_hhFree (whc->seq_hash);
#endif
os_mutexDestroy (&whc->lock);
os_free (whc);
}
int whc_empty (const struct whc *whc)
static void get_state_locked(const struct whc_impl *whc, struct whc_state *st)
{
return whc->seq_size == 0;
}
seqno_t whc_min_seq (const struct whc *whc)
{
/* precond: whc not empty */
if (whc->seq_size == 0)
{
st->min_seq = st->max_seq = -1;
st->unacked_bytes = 0;
}
else
{
const struct whc_intvnode *intv;
check_whc (whc);
assert (!whc_empty (whc));
intv = ut_avlFindMin (&whc_seq_treedef, &whc->seq);
assert (intv);
/* not empty, open node may be anything but is (by definition)
findmax, and whc is claimed to be non-empty, so min interval
can't be empty */
assert (intv->maxp1 > intv->min);
return intv->min;
st->min_seq = intv->min;
st->max_seq = whc->maxseq_node->seq;
st->unacked_bytes = whc->unacked_bytes;
}
}
struct whc_node *whc_findmax (const struct whc *whc)
static void whc_get_state(const struct whc *whc_generic, struct whc_state *st)
{
const struct whc_impl * const whc = (const struct whc_impl *)whc_generic;
os_mutexLock ((struct os_mutex *)&whc->lock);
check_whc (whc);
return (struct whc_node *) whc->maxseq_node;
get_state_locked(whc, st);
os_mutexUnlock ((struct os_mutex *)&whc->lock);
}
seqno_t whc_max_seq (const struct whc *whc)
{
/* precond: whc not empty */
check_whc (whc);
assert (!whc_empty (whc));
assert (whc->maxseq_node != NULL);
return whc->maxseq_node->seq;
}
static struct whc_node *find_nextseq_intv (struct whc_intvnode **p_intv, const struct whc *whc, seqno_t seq)
static struct whc_node *find_nextseq_intv (struct whc_intvnode **p_intv, const struct whc_impl *whc, seqno_t seq)
{
struct whc_node *n;
struct whc_intvnode *intv;
@ -385,25 +519,23 @@ static struct whc_node *find_nextseq_intv (struct whc_intvnode **p_intv, const s
}
}
seqno_t whc_next_seq (const struct whc *whc, seqno_t seq)
static seqno_t whc_next_seq (const struct whc *whc_generic, seqno_t seq)
{
const struct whc_impl * const whc = (const struct whc_impl *)whc_generic;
struct whc_node *n;
struct whc_intvnode *intv;
seqno_t nseq;
os_mutexLock ((struct os_mutex *)&whc->lock);
check_whc (whc);
if ((n = find_nextseq_intv (&intv, whc, seq)) == NULL)
return MAX_SEQ_NUMBER;
nseq = MAX_SEQ_NUMBER;
else
return n->seq;
nseq = n->seq;
os_mutexUnlock ((struct os_mutex *)&whc->lock);
return nseq;
}
struct whc_node* whc_next_node(const struct whc *whc, seqno_t seq)
{
struct whc_intvnode *intv;
check_whc (whc);
return find_nextseq_intv(&intv, whc, seq);
}
static void delete_one_sample_from_idx (struct whc *whc, struct whc_node *whcn)
static void delete_one_sample_from_idx (struct whc_impl *whc, struct whc_node *whcn)
{
struct whc_idxnode * const idxn = whcn->idxnode;
assert (idxn != NULL);
@ -426,7 +558,7 @@ static void delete_one_sample_from_idx (struct whc *whc, struct whc_node *whcn)
whcn->idxnode = NULL;
}
static void free_one_instance_from_idx (struct whc *whc, seqno_t max_drop_seq, struct whc_idxnode *idxn)
static void free_one_instance_from_idx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_idxnode *idxn)
{
unsigned i;
for (i = 0; i < whc->idxdepth; i++)
@ -446,14 +578,14 @@ static void free_one_instance_from_idx (struct whc *whc, seqno_t max_drop_seq, s
os_free(idxn);
}
static void delete_one_instance_from_idx (struct whc *whc, seqno_t max_drop_seq, struct whc_idxnode *idxn)
static void delete_one_instance_from_idx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_idxnode *idxn)
{
if (!ut_hhRemove (whc->idx_hash, idxn))
assert (0);
free_one_instance_from_idx (whc, max_drop_seq, idxn);
}
static int whcn_in_tlidx (const struct whc *whc, const struct whc_idxnode *idxn, unsigned pos)
static int whcn_in_tlidx (const struct whc_impl *whc, const struct whc_idxnode *idxn, unsigned pos)
{
if (idxn == NULL)
return 0;
@ -465,19 +597,24 @@ static int whcn_in_tlidx (const struct whc *whc, const struct whc_idxnode *idxn,
}
}
void whc_downgrade_to_volatile (struct whc *whc)
static unsigned whc_downgrade_to_volatile (struct whc *whc_generic, struct whc_state *st)
{
struct whc_impl * const whc = (struct whc_impl *)whc_generic;
seqno_t old_max_drop_seq;
struct whc_node *deferred_free_list;
unsigned cnt;
/* We only remove them from whc->tlidx: we don't remove them from
whc->seq yet. That'll happen eventually. */
os_mutexLock (&whc->lock);
check_whc (whc);
if (whc->idxdepth == 0)
{
/* if not maintaining an index at all, this is nonsense */
return;
get_state_locked(whc, st);
os_mutexUnlock (&whc->lock);
return 0;
}
assert (!whc->is_transient_local);
@ -502,18 +639,21 @@ void whc_downgrade_to_volatile (struct whc *whc)
them all. */
old_max_drop_seq = whc->max_drop_seq;
whc->max_drop_seq = 0;
whc_remove_acked_messages_full (whc, old_max_drop_seq, &deferred_free_list);
whc_free_deferred_free_list (whc, deferred_free_list);
cnt = whc_remove_acked_messages_full (whc, old_max_drop_seq, &deferred_free_list);
whc_free_deferred_free_list (whc_generic, deferred_free_list);
assert (whc->max_drop_seq == old_max_drop_seq);
get_state_locked(whc, st);
os_mutexUnlock (&whc->lock);
return cnt;
}
static size_t whcn_size (const struct whc *whc, const struct whc_node *whcn)
static size_t whcn_size (const struct whc_impl *whc, const struct whc_node *whcn)
{
size_t sz = ddsi_serdata_size (whcn->serdata);
return sz + ((sz + config.fragment_size - 1) / config.fragment_size) * whc->sample_overhead;
}
static void whc_delete_one_intv (struct whc *whc, struct whc_intvnode **p_intv, struct whc_node **p_whcn)
static void whc_delete_one_intv (struct whc_impl *whc, struct whc_intvnode **p_intv, struct whc_node **p_whcn)
{
/* Removes *p_whcn, possibly deleting or splitting *p_intv, as the
case may be. Does *NOT* update whc->seq_size. *p_intv must be
@ -612,7 +752,7 @@ static void whc_delete_one_intv (struct whc *whc, struct whc_intvnode **p_intv,
}
}
static void whc_delete_one (struct whc *whc, struct whc_node *whcn)
static void whc_delete_one (struct whc_impl *whc, struct whc_node *whcn)
{
struct whc_intvnode *intv;
struct whc_node *whcn_tmp = whcn;
@ -624,11 +764,11 @@ static void whc_delete_one (struct whc *whc, struct whc_node *whcn)
if (whcn_tmp->next_seq)
whcn_tmp->next_seq->prev_seq = whcn_tmp->prev_seq;
whcn_tmp->next_seq = NULL;
whc_free_deferred_free_list (whc, whcn_tmp);
free_deferred_free_list (whc, whcn_tmp);
whc->seq_size--;
}
void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list)
static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *deferred_free_list)
{
if (deferred_free_list)
{
@ -637,7 +777,8 @@ void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_fre
for (cur = deferred_free_list, last = NULL; cur; last = cur, cur = cur->next_seq)
{
n++;
free_whc_node_contents (whc, cur);
if (!cur->borrowed)
free_whc_node_contents (cur);
}
cur = nn_freelist_pushmany (&whc->freelist, deferred_free_list, last, n);
while (cur)
@ -649,7 +790,13 @@ void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_fre
}
}
static unsigned whc_remove_acked_messages_noidx (struct whc *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list)
static void whc_free_deferred_free_list (struct whc *whc_generic, struct whc_node *deferred_free_list)
{
struct whc_impl * const whc = (struct whc_impl *)whc_generic;
free_deferred_free_list(whc, deferred_free_list);
}
static unsigned whc_remove_acked_messages_noidx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list)
{
struct whc_intvnode *intv;
struct whc_node *whcn;
@ -725,7 +872,7 @@ static unsigned whc_remove_acked_messages_noidx (struct whc *whc, seqno_t max_dr
return ndropped;
}
static unsigned whc_remove_acked_messages_full (struct whc *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list)
static unsigned whc_remove_acked_messages_full (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list)
{
struct whc_intvnode *intv;
struct whc_node *whcn;
@ -859,49 +1006,36 @@ static unsigned whc_remove_acked_messages_full (struct whc *whc, seqno_t max_dro
return ndropped;
}
unsigned whc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list)
static unsigned whc_remove_acked_messages (struct whc *whc_generic, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list)
{
struct whc_impl * const whc = (struct whc_impl *)whc_generic;
unsigned cnt;
os_mutexLock (&whc->lock);
assert (max_drop_seq < MAX_SEQ_NUMBER);
assert (max_drop_seq >= whc->max_drop_seq);
if (config.enabled_logcats & LC_WHC)
{
struct whc_state whcst;
get_state_locked(whc, &whcst);
TRACE_WHC(("whc_remove_acked_messages(%p max_drop_seq %"PRId64")\n", (void *)whc, max_drop_seq));
TRACE_WHC((" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %u tl %u\n",
whc_empty(whc) ? (seqno_t)-1 : whc_min_seq(whc),
whc_empty(whc) ? (seqno_t)-1 : whc_max_seq(whc),
whc->max_drop_seq, whc->hdepth, whc->tldepth));
whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth));
}
check_whc (whc);
if (whc->idxdepth == 0)
{
return whc_remove_acked_messages_noidx (whc, max_drop_seq, deferred_free_list);
}
cnt = whc_remove_acked_messages_noidx (whc, max_drop_seq, deferred_free_list);
else
{
return whc_remove_acked_messages_full (whc, max_drop_seq, deferred_free_list);
}
cnt = whc_remove_acked_messages_full (whc, max_drop_seq, deferred_free_list);
get_state_locked(whc, whcst);
os_mutexUnlock (&whc->lock);
return cnt;
}
struct whc_node *whc_findkey (const struct whc *whc, const struct serdata *serdata_key)
{
union {
struct whc_idxnode idxn;
char pad[sizeof(struct whc_idxnode) + sizeof(struct whc_node *)];
} template;
struct whc_idxnode *n;
check_whc (whc);
template.idxn.iid = dds_tkmap_lookup(gv.m_tkmap, serdata_key);
n = ut_hhLookup (whc->idx_hash, &template.idxn);
if (n == NULL)
return NULL;
else
{
assert (n->hist[n->headidx]);
return n->hist[n->headidx];
}
}
static struct whc_node *whc_insert_seq (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, serdata_t serdata)
static struct whc_node *whc_insert_seq (struct whc_impl *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, serdata_t serdata)
{
struct whc_node *newn = NULL;
@ -910,6 +1044,7 @@ static struct whc_node *whc_insert_seq (struct whc *whc, seqno_t max_drop_seq, s
newn->seq = seq;
newn->plist = plist;
newn->unacked = (seq > max_drop_seq);
newn->borrowed = 0;
newn->idxnode = NULL; /* initial state, may be changed */
newn->idxnode_pos = 0;
newn->last_rexmit_ts.v = 0;
@ -961,21 +1096,27 @@ static struct whc_node *whc_insert_seq (struct whc *whc, seqno_t max_drop_seq, s
return newn;
}
int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, serdata_t serdata, struct tkmap_instance *tk)
static int whc_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, serdata_t serdata, struct tkmap_instance *tk)
{
struct whc_impl * const whc = (struct whc_impl *)whc_generic;
struct whc_node *newn = NULL;
struct whc_idxnode *idxn;
union {
struct whc_idxnode idxn;
char pad[sizeof(struct whc_idxnode) + sizeof(struct whc_node *)];
} template;
os_mutexLock (&whc->lock);
check_whc (whc);
if (config.enabled_logcats & LC_WHC)
{
struct whc_state whcst;
get_state_locked(whc, &whcst);
TRACE_WHC(("whc_insert(%p max_drop_seq %"PRId64" seq %"PRId64" plist %p serdata %p:%x)\n", (void *)whc, max_drop_seq, seq, (void*)plist, (void*)serdata, *(unsigned *)serdata->v.keyhash.m_hash));
TRACE_WHC((" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %u tl %u\n",
whc_empty(whc) ? (seqno_t)-1 : whc_min_seq(whc),
whc_empty(whc) ? (seqno_t)-1 : whc_max_seq(whc),
whc->max_drop_seq, whc->hdepth, whc->tldepth));
whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth));
}
assert (max_drop_seq < MAX_SEQ_NUMBER);
assert (max_drop_seq >= whc->max_drop_seq);
@ -983,7 +1124,7 @@ int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_pl
/* Seq must be greater than what is currently stored. Usually it'll
be the next sequence number, but if there are no readers
temporarily, a gap may be among the possibilities */
assert (whc_empty (whc) || seq > whc_max_seq (whc));
assert (whc->seq_size == 0 || seq > whc->maxseq_node->seq);
/* Always insert in seq admin */
newn = whc_insert_seq (whc, max_drop_seq, seq, plist, serdata);
@ -994,6 +1135,7 @@ int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_pl
if (ddsi_serdata_is_empty(serdata) || whc->idxdepth == 0)
{
TRACE_WHC((" empty or no hist\n"));
os_mutexUnlock (&whc->lock);
return 0;
}
@ -1089,10 +1231,123 @@ int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_pl
}
TRACE_WHC(("\n"));
}
os_mutexUnlock (&whc->lock);
return 0;
}
size_t whc_unacked_bytes (struct whc *whc)
static void make_borrowed_sample(struct whc_borrowed_sample *sample, struct whc_node *whcn)
{
return whc->unacked_bytes;
assert(!whcn->borrowed);
whcn->borrowed = 1;
sample->seq = whcn->seq;
sample->plist = whcn->plist;
sample->serdata = whcn->serdata;
sample->unacked = whcn->unacked;
sample->rexmit_count = whcn->rexmit_count;
sample->last_rexmit_ts = whcn->last_rexmit_ts;
}
static bool whc_borrow_sample (const struct whc *whc_generic, seqno_t seq, struct whc_borrowed_sample *sample)
{
const struct whc_impl * const whc = (const struct whc_impl *)whc_generic;
struct whc_node *whcn;
bool found;
os_mutexLock ((os_mutex *)&whc->lock);
if ((whcn = whc_findseq(whc, seq)) == NULL)
found = false;
else
{
make_borrowed_sample(sample, whcn);
found = true;
}
os_mutexUnlock ((os_mutex *)&whc->lock);
return found;
}
static bool whc_borrow_sample_key (const struct whc *whc_generic, const struct serdata *serdata_key, struct whc_borrowed_sample *sample)
{
const struct whc_impl * const whc = (const struct whc_impl *)whc_generic;
struct whc_node *whcn;
bool found;
os_mutexLock ((os_mutex *)&whc->lock);
if ((whcn = whc_findkey(whc, serdata_key)) == NULL)
found = false;
else
{
make_borrowed_sample(sample, whcn);
found = true;
}
os_mutexUnlock ((os_mutex *)&whc->lock);
return found;
}
static void return_sample_locked (struct whc_impl *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info)
{
struct whc_node *whcn;
if ((whcn = whc_findseq (whc, sample->seq)) == NULL)
{
/* data no longer present in WHC - that means ownership for serdata, plist shifted to the borrowed copy and "returning" it really becomes "destroying" it */
ddsi_serdata_unref (sample->serdata);
if (sample->plist) {
nn_plist_fini (sample->plist);
os_free (sample->plist);
}
}
else
{
assert(whcn->borrowed);
whcn->borrowed = 0;
if (update_retransmit_info)
{
whcn->rexmit_count = sample->rexmit_count;
whcn->last_rexmit_ts = sample->last_rexmit_ts;
}
}
}
static void whc_return_sample (struct whc *whc_generic, struct whc_borrowed_sample *sample, bool update_retransmit_info)
{
struct whc_impl * const whc = (struct whc_impl *)whc_generic;
os_mutexLock (&whc->lock);
return_sample_locked (whc, sample, update_retransmit_info);
os_mutexUnlock (&whc->lock);
}
static void whc_sample_iter_init (const struct whc *whc_generic, struct whc_sample_iter *opaque_it)
{
const struct whc_impl * const whc = (const struct whc_impl *)whc_generic;
struct whc_sample_iter_impl *it = (struct whc_sample_iter_impl *)opaque_it->opaque.opaque;
it->whc = (struct whc_impl *)whc;
it->first = true;
}
static bool whc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample)
{
struct whc_sample_iter_impl * const it = (struct whc_sample_iter_impl *)opaque_it->opaque.opaque;
struct whc_impl * const whc = it->whc;
struct whc_node *whcn;
struct whc_intvnode *intv;
seqno_t seq;
bool valid;
os_mutexLock (&whc->lock);
check_whc (whc);
if (!it->first)
{
seq = sample->seq;
return_sample_locked(whc, sample, false);
}
else
{
it->first = false;
seq = 0;
}
if ((whcn = find_nextseq_intv (&intv, whc, seq)) == NULL)
valid = false;
else
{
make_borrowed_sample(sample, whcn);
valid = true;
}
os_mutexUnlock (&whc->lock);
return valid;
}

View file

@ -21,6 +21,7 @@
#include "dds__err.h"
#include "dds__init.h"
#include "dds__tkmap.h"
#include "dds__whc.h"
#include "dds__report.h"
#include "ddsc/ddsc_project.h"
@ -233,6 +234,8 @@ dds_writer_delete(
assert(e);
assert(thr);
/* FIXME: not freeing WHC here because it is owned by the DDSI entity */
if (asleep) {
thread_state_awake(thr);
}
@ -338,6 +341,62 @@ dds_writer_qos_set(
return ret;
}
static struct whc *make_whc(const dds_qos_t *qos)
{
bool startup_mode;
bool handle_as_transient_local;
unsigned hdepth, tldepth;
/* Startup mode causes the writer to treat data in its WHC as if
transient-local, for the first few seconds after startup of the
DDSI service. It is done for volatile reliable writers only
(which automatically excludes all builtin writers) or for all
writers except volatile best-effort & transient-local ones.
Which one to use depends on whether merge policies are in effect
in durability. If yes, then durability will take care of all
transient & persistent data; if no, DDSI discovery usually takes
too long and this'll save you.
Note: may still be cleared, if it turns out we are not maintaining
an index at all (e.g., volatile KEEP_ALL) */
if (config.startup_mode_full) {
startup_mode = gv.startup_mode &&
(qos->durability.kind >= DDS_DURABILITY_TRANSIENT ||
(qos->durability.kind == DDS_DURABILITY_VOLATILE &&
qos->reliability.kind != DDS_RELIABILITY_BEST_EFFORT));
} else {
startup_mode = gv.startup_mode &&
(qos->durability.kind == DDS_DURABILITY_VOLATILE &&
qos->reliability.kind != DDS_RELIABILITY_BEST_EFFORT);
}
/* Construct WHC -- if aggressive_keep_last1 is set, the WHC will
drop all samples for which a later update is available. This
forces it to maintain a tlidx. */
handle_as_transient_local = (qos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL);
if (!config.aggressive_keep_last_whc || qos->history.kind == DDS_HISTORY_KEEP_ALL)
hdepth = 0;
else
hdepth = (unsigned)qos->history.depth;
if (handle_as_transient_local) {
if (qos->durability_service.history.kind == DDS_HISTORY_KEEP_ALL)
tldepth = 0;
else
tldepth = (unsigned)qos->durability_service.history.depth;
} else if (startup_mode) {
tldepth = (hdepth == 0) ? 1 : hdepth;
} else {
tldepth = 0;
}
if (hdepth == 0 && tldepth == 0)
{
/* no index at all - so no need to bother with startup mode */
startup_mode = 0;
}
return whc_new (handle_as_transient_local, hdepth, tldepth);
}
_Pre_satisfies_(((participant_or_publisher & DDS_ENTITY_KIND_MASK) == DDS_KIND_PUBLISHER) || \
((participant_or_publisher & DDS_ENTITY_KIND_MASK) == DDS_KIND_PARTICIPANT))
@ -426,6 +485,7 @@ dds_create_writer(
wr->m_entity.m_deriver.set_qos = dds_writer_qos_set;
wr->m_entity.m_deriver.validate_status = dds_writer_status_validate;
wr->m_entity.m_deriver.get_instance_hdl = dds_writer_instance_hdl;
wr->m_whc = make_whc (wqos);
/* Extra claim of this writer to make sure that the delete waits until DDSI
* has deleted its writer as well. This can be known through the callback. */
@ -439,8 +499,7 @@ dds_create_writer(
if (asleep) {
thread_state_awake(thr);
}
wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_participant->m_guid, ((dds_topic*)tp)->m_stopic,
wqos, dds_writer_status_cb, wr);
wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_participant->m_guid, ((dds_topic*)tp)->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
os_mutexLock(&pub->m_mutex);
os_mutexLock(&tp->m_mutex);
assert(wr->m_wr);

View file

@ -33,7 +33,6 @@
32 src/q_thread_inlines.c
33 src/q_time.c
34 src/q_transmit.c
35 src/q_whc.c
36 src/q_xevent.c
37 src/q_xmsg.c
38 src/q_freelist.c

View file

@ -50,7 +50,6 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
q_time.c
q_transmit.c
q_inverse_uint32_set.c
q_whc.c
q_xevent.c
q_xmsg.c
q_freelist.c

View file

@ -474,34 +474,15 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity
GUID "ppguid". May return NULL if participant unknown or
writer/reader already known. */
struct writer * new_writer
(
struct nn_guid *wrguid,
const struct nn_guid *group_guid,
const struct nn_guid *ppguid,
const struct sertopic *topic,
const struct nn_xqos *xqos,
status_cb_t status_cb,
void * status_cb_arg
);
struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg);
struct reader * new_reader
(
struct nn_guid *rdguid,
const struct nn_guid *group_guid,
const struct nn_guid *ppguid,
const struct sertopic *topic,
const struct nn_xqos *xqos,
struct rhc * rhc,
status_cb_t status_cb,
void * status_cb_arg
);
struct reader * new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void * status_cb_arg);
struct whc_node;
unsigned remove_acked_messages (struct writer *wr, struct whc_node **deferred_free_list);
unsigned remove_acked_messages_and_free (struct writer *wr);
struct whc_state;
unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, struct whc_node **deferred_free_list);
seqno_t writer_max_drop_seq (const struct writer *wr);
int writer_must_have_hb_scheduled (const struct writer *wr);
int writer_must_have_hb_scheduled (const struct writer *wr, const struct whc_state *whcst);
void writer_set_retransmitting (struct writer *wr);
void writer_clear_retransmitting (struct writer *wr);

View file

@ -17,6 +17,7 @@ extern "C" {
#endif
struct writer;
struct whc_state;
struct hbcontrol {
nn_mtime_t t_of_last_write;
@ -28,12 +29,12 @@ struct hbcontrol {
};
void writer_hbcontrol_init (struct hbcontrol *hbc);
int64_t writer_hbcontrol_intv (const struct writer *wr, nn_mtime_t tnow);
void writer_hbcontrol_note_asyncwrite (struct writer *wr, nn_mtime_t tnow);
int writer_hbcontrol_ack_required (const struct writer *wr, nn_mtime_t tnow);
struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, nn_mtime_t tnow, unsigned packetid, int *hbansreq);
int writer_hbcontrol_must_send (const struct writer *wr, nn_mtime_t tnow);
struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, nn_mtime_t tnow, int hbansreq, int issync);
int64_t writer_hbcontrol_intv (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow);
void writer_hbcontrol_note_asyncwrite (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow);
int writer_hbcontrol_ack_required (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow);
struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow, unsigned packetid, int *hbansreq);
int writer_hbcontrol_must_send (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow);
struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow, int hbansreq, int issync);
#if defined (__cplusplus)
}

View file

@ -22,6 +22,7 @@ extern "C" {
struct nn_xpack;
struct nn_xmsg;
struct writer;
struct whc_state;
struct proxy_reader;
struct serdata;
struct tkmap_instance;
@ -41,7 +42,7 @@ int write_sample_nogc_notk (struct nn_xpack *xp, struct writer *wr, struct serda
/* When calling the following functions, wr->lock must be held */
int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew);
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct serdata *serdata, struct proxy_reader *prd, int isnew);
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, int hbansreq, nn_entityid_t dst, int issync);
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, nn_entityid_t dst, int issync);
#if defined (__cplusplus)
}

View file

@ -12,108 +12,83 @@
#ifndef Q_WHC_H
#define Q_WHC_H
#include "util/ut_avl.h"
#include "util/ut_hopscotch.h"
#include "ddsi/q_time.h"
#include "ddsi/q_rtps.h"
#include "ddsi/q_freelist.h"
#if defined (__cplusplus)
extern "C" {
#endif
struct serdata;
struct nn_plist;
struct whc_idxnode;
struct tkmap_instance;
struct whc_node; /* opaque, but currently used for deferred free lists */
struct whc;
#define USE_EHH 0
struct whc_node {
struct whc_node *next_seq; /* next in this interval */
struct whc_node *prev_seq; /* prev in this interval */
struct whc_idxnode *idxnode; /* NULL if not in index */
unsigned idxnode_pos; /* index in idxnode.hist */
struct whc_borrowed_sample {
seqno_t seq;
uint64_t total_bytes; /* cumulative number of bytes up to and including this node */
size_t size;
struct nn_plist *plist; /* 0 if nothing special */
unsigned unacked: 1; /* counted in whc::unacked_bytes iff 1 */
struct serdata *serdata;
struct nn_plist *plist;
bool unacked;
nn_mtime_t last_rexmit_ts;
unsigned rexmit_count;
struct serdata *serdata;
};
struct whc_intvnode {
ut_avlNode_t avlnode;
seqno_t min;
seqno_t maxp1;
struct whc_node *first; /* linked list of seqs with contiguous sequence numbers [min,maxp1) */
struct whc_node *last; /* valid iff first != NULL */
};
struct whc_idxnode {
int64_t iid;
seqno_t prune_seq;
struct tkmap_instance *tk;
unsigned headidx;
#if __STDC_VERSION__ >= 199901L
struct whc_node *hist[];
#else
struct whc_node *hist[1];
#endif
};
#if USE_EHH
struct whc_seq_entry {
seqno_t seq;
struct whc_node *whcn;
};
#endif
struct whc {
unsigned seq_size;
struct whc_state {
seqno_t min_seq; /* -1 if WHC empty, else > 0 */
seqno_t max_seq; /* -1 if WHC empty, else >= min_seq */
size_t unacked_bytes;
size_t sample_overhead;
uint64_t total_bytes; /* total number of bytes pushed in */
unsigned is_transient_local: 1;
unsigned hdepth; /* 0 = unlimited */
unsigned tldepth; /* 0 = disabled/unlimited (no need to maintain an index if KEEP_ALL <=> is_transient_local + tldepth=0) */
unsigned idxdepth; /* = max(hdepth, tldepth) */
seqno_t max_drop_seq; /* samples in whc with seq <= max_drop_seq => transient-local */
struct whc_intvnode *open_intv; /* interval where next sample will go (usually) */
struct whc_node *maxseq_node; /* NULL if empty; if not in open_intv, open_intv is empty */
struct nn_freelist freelist; /* struct whc_node *; linked via whc_node::next_seq */
#if USE_EHH
struct ut_ehh *seq_hash;
#else
struct ut_hh *seq_hash;
#endif
struct ut_hh *idx_hash;
ut_avlTree_t seq;
};
#define WHCST_ISEMPTY(whcst) ((whcst)->max_seq == -1)
/* Adjust SIZE and alignment stuff as needed: they are here simply so we can allocate
an iter on the stack without specifying an implementation. If future changes or
implementations require more, these can be adjusted. An implementation should check
things fit at compile time. */
#define WHC_SAMPLE_ITER_SIZE (2*sizeof(void *))
struct whc_sample_iter {
union {
char opaque[WHC_SAMPLE_ITER_SIZE];
/* cover alignment requirements: */
uint64_t x;
double y;
void *p;
} opaque;
};
struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth, size_t sample_overhead);
void whc_free (struct whc *whc);
int whc_empty (const struct whc *whc);
seqno_t whc_min_seq (const struct whc *whc);
seqno_t whc_max_seq (const struct whc *whc);
seqno_t whc_next_seq (const struct whc *whc, seqno_t seq);
size_t whc_unacked_bytes (struct whc *whc);
struct whc_node *whc_findseq (const struct whc *whc, seqno_t seq);
struct whc_node *whc_findmax (const struct whc *whc);
struct whc_node *whc_findkey (const struct whc *whc, const struct serdata *serdata_key);
struct whc_node *whc_next_node (const struct whc *whc, seqno_t seq);
typedef seqno_t (*whc_next_seq_t)(const struct whc *whc, seqno_t seq);
typedef void (*whc_get_state_t)(const struct whc *whc, struct whc_state *st);
typedef bool (*whc_borrow_sample_t)(const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample);
typedef bool (*whc_borrow_sample_key_t)(const struct whc *whc, const struct serdata *serdata_key, struct whc_borrowed_sample *sample);
typedef void (*whc_return_sample_t)(struct whc *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info);
typedef void (*whc_sample_iter_init_t)(const struct whc *whc, struct whc_sample_iter *it);
typedef bool (*whc_sample_iter_borrow_next_t)(struct whc_sample_iter *it, struct whc_borrowed_sample *sample);
typedef void (*whc_free_t)(struct whc *whc);
/* min_seq is lowest sequence number that must be retained because of
reliable readers that have not acknowledged all data */
/* max_drop_seq must go soon, it's way too ugly. */
/* plist may be NULL or os_malloc'd, WHC takes ownership of plist */
int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct serdata *serdata, struct tkmap_instance *tk);
void whc_downgrade_to_volatile (struct whc *whc);
unsigned whc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list);
void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list);
typedef int (*whc_insert_t)(struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct serdata *serdata, struct tkmap_instance *tk);
typedef unsigned (*whc_downgrade_to_volatile_t)(struct whc *whc, struct whc_state *st);
typedef unsigned (*whc_remove_acked_messages_t)(struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list);
typedef void (*whc_free_deferred_free_list_t)(struct whc *whc, struct whc_node *deferred_free_list);
struct whc_ops {
whc_insert_t insert;
whc_remove_acked_messages_t remove_acked_messages;
whc_free_deferred_free_list_t free_deferred_free_list;
whc_get_state_t get_state;
whc_next_seq_t next_seq;
whc_borrow_sample_t borrow_sample;
whc_borrow_sample_key_t borrow_sample_key;
whc_return_sample_t return_sample;
whc_sample_iter_init_t sample_iter_init;
whc_sample_iter_borrow_next_t sample_iter_borrow_next;
whc_downgrade_to_volatile_t downgrade_to_volatile;
whc_free_t free;
};
struct whc {
const struct whc_ops *ops;
};
#if defined (__cplusplus)
}

View file

@ -22,7 +22,6 @@
#include "ddsi/q_time.h"
#include "ddsi/q_misc.h"
#include "ddsi/q_log.h"
#include "ddsi/q_whc.h"
#include "ddsi/q_plist.h"
#include "q__osplser.h"
#include "ddsi/q_ephash.h"
@ -39,7 +38,7 @@
#include "ddsi/ddsi_tcp.h"
#include "ddsi/sysdeps.h"
#include "dds__whc.h"
struct plugin {
debug_monitor_plugin_t fn;
@ -185,14 +184,14 @@ static int print_participants (struct thread_state1 *self, ddsi_tran_conn_t conn
{
ut_avlIter_t rdit;
struct wr_prd_match *m;
struct whc_state whcst;
if (w->c.pp != p)
continue;
os_mutexLock (&w->e.lock);
print_endpoint_common (conn, "wr", &w->e, &w->c, w->xqos, w->topic);
w->whc->ops->get_state(w->whc, &whcst);
x += cpf (conn, " whc [%lld,%lld] unacked %"PRIuSIZE"%s [%u,%u] seq %lld seq_xmit %lld cs_seq %lld\n",
whc_empty (w->whc) ? -1 : whc_min_seq (w->whc),
whc_empty (w->whc) ? -1 : whc_max_seq (w->whc),
whc_unacked_bytes (w->whc),
whcst.min_seq, whcst.max_seq, whcst.unacked_bytes,
w->throttling ? " THROTTLING" : "",
w->whc_low, w->whc_high,
w->seq, READ_SEQ_XMIT(w), w->cs_seq);

View file

@ -22,7 +22,6 @@
#include "ddsi/q_misc.h"
#include "ddsi/q_log.h"
#include "util/ut_avl.h"
#include "ddsi/q_whc.h"
#include "ddsi/q_plist.h"
#include "ddsi/q_lease.h"
#include "q__osplser.h"
@ -42,6 +41,7 @@
#include "ddsi/ddsi_mcgroup.h"
#include "ddsi/sysdeps.h"
#include "dds__whc.h"
struct deleted_participant {
ut_avlNode_t avlnode;
@ -84,27 +84,8 @@ static const unsigned prismtech_builtin_writers_besmask =
NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_WRITER |
NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_WRITER;
static struct writer * new_writer_guid
(
const struct nn_guid *guid,
const struct nn_guid *group_guid,
struct participant *pp,
const struct sertopic *topic,
const struct nn_xqos *xqos,
status_cb_t status_cb,
void *status_cbarg
);
static struct reader * new_reader_guid
(
const struct nn_guid *guid,
const struct nn_guid *group_guid,
struct participant *pp,
const struct sertopic *topic,
const struct nn_xqos *xqos,
struct rhc *rhc,
status_cb_t status_cb,
void *status_cbarg
);
static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_cbarg);
static struct reader * new_reader_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct sertopic *topic, const struct nn_xqos *xqos, struct rhc *rhc, status_cb_t status_cb, void *status_cbarg);
static struct participant *ref_participant (struct participant *pp, const struct nn_guid *guid_of_refing_entity);
static void unref_participant (struct participant *pp, const struct nn_guid *guid_of_refing_entity);
static void delete_proxy_group_locked (struct proxy_group *pgroup, nn_wctime_t timestamp, int isimplicit);
@ -466,7 +447,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
{
subguid.entityid = to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, LAST_WR_PARAMS);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, whc_new(1, 1, 1), LAST_WR_PARAMS);
/* But we need the as_disc address set for SPDP, because we need to
send it to everyone regardless of the existence of readers. */
{
@ -489,23 +470,23 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
{
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PARTICIPANT_WRITER;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PUBLISHER_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_WRITER;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_SUBSCRIBER_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_WRITER;
}
@ -513,7 +494,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
{
/* TODO: make this one configurable, we don't want all participants to publish all topics (or even just those that they use themselves) */
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_TOPIC_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_TOPIC_ANNOUNCER;
}
@ -521,7 +502,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
{
subguid.entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, LAST_WR_PARAMS);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER;
}
@ -1311,17 +1292,18 @@ static void free_wr_rd_match (struct wr_rd_match *m)
static void writer_drop_connection (const struct nn_guid * wr_guid, const struct proxy_reader * prd)
{
struct whc_node *deferred_free_list;
struct writer *wr;
if ((wr = ephash_lookup_writer_guid (wr_guid)) != NULL)
{
struct whc_node *deferred_free_list = NULL;
struct wr_prd_match *m;
os_mutexLock (&wr->e.lock);
if ((m = ut_avlLookup (&wr_readers_treedef, &wr->readers, &prd->e.guid)) != NULL)
{
struct whc_state whcst;
ut_avlDelete (&wr_readers_treedef, &wr->readers, m);
rebuild_writer_addrset (wr);
remove_acked_messages (wr, &deferred_free_list);
remove_acked_messages (wr, &whcst, &deferred_free_list);
wr->num_reliable_readers -= m->is_reliable;
if (wr->status_cb)
{
@ -1333,6 +1315,7 @@ static void writer_drop_connection (const struct nn_guid * wr_guid, const struct
}
}
os_mutexUnlock (&wr->e.lock);
wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list);
free_wr_prd_match (m);
}
}
@ -1616,20 +1599,23 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd)
ut_avlInsertIPath (&wr_local_readers_treedef, &wr->local_readers, m, &path);
local_reader_ary_insert (&wr->rdary, rd);
/* Store available data into the late joining reader when it is reliable. */
if ((rd->xqos->reliability.kind > NN_BEST_EFFORT_RELIABILITY_QOS) /* transient reader */ &&
(!whc_empty(wr->whc)) /* data available */ )
/* Store available data into the late joining reader when it is reliable (we don't do
historical data for best-effort data over the wire, so also not locally).
FIXME: should limit ourselves to what it is available because of durability history,
not writer history */
if (rd->xqos->reliability.kind > NN_BEST_EFFORT_RELIABILITY_QOS && rd->xqos->durability.kind > NN_VOLATILE_DURABILITY_QOS)
{
seqno_t seq = -1;
struct whc_node *n;
while ((n = whc_next_node(wr->whc, seq)) != NULL)
struct whc_sample_iter it;
struct whc_borrowed_sample sample;
wr->whc->ops->sample_iter_init(wr->whc, &it);
while (wr->whc->ops->sample_iter_borrow_next(&it, &sample))
{
struct proxy_writer_info pwr_info;
serdata_t payload = n->serdata;
serdata_t payload = sample.serdata;
/* FIXME: whc has tk reference in its index nodes, which is what we really should be iterating over anyway, and so we don't really have to look them up anymore */
struct tkmap_instance *tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (payload);
make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos);
(void)(ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk);
seq = n->seq;
}
}
@ -1645,7 +1631,6 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd)
data.handle = rd->e.iid;
(wr->status_cb) (wr->status_cb_entity, &data);
}
}
static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr, nn_count_t *init_count)
@ -2502,9 +2487,9 @@ seqno_t writer_max_drop_seq (const struct writer *wr)
return (n->min_seq == MAX_SEQ_NUMBER) ? wr->seq : n->min_seq;
}
int writer_must_have_hb_scheduled (const struct writer *wr)
int writer_must_have_hb_scheduled (const struct writer *wr, const struct whc_state *whcst)
{
if (ut_avlIsEmpty (&wr->readers) || whc_empty (wr->whc))
if (ut_avlIsEmpty (&wr->readers) || whcst->max_seq < 0)
{
/* Can't transmit a valid heartbeat if there is no data; and it
wouldn't actually be sent anywhere if there are no readers, so
@ -2528,7 +2513,7 @@ int writer_must_have_hb_scheduled (const struct writer *wr)
requiring a non-empty whc_seq: if it is transient_local,
whc_seq usually won't be empty even when all msgs have been
ack'd. */
return writer_max_drop_seq (wr) < whc_max_seq (wr->whc);
return writer_max_drop_seq (wr) < whcst->max_seq;
}
}
@ -2550,21 +2535,19 @@ void writer_clear_retransmitting (struct writer *wr)
os_condBroadcast (&wr->throttle_cond);
}
unsigned remove_acked_messages (struct writer *wr, struct whc_node **deferred_free_list)
unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, struct whc_node **deferred_free_list)
{
unsigned n;
size_t n_unacked;
assert (wr->e.guid.entityid.u != NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER);
ASSERT_MUTEX_HELD (&wr->e.lock);
n = whc_remove_acked_messages (wr->whc, writer_max_drop_seq (wr), deferred_free_list);
n = wr->whc->ops->remove_acked_messages (wr->whc, writer_max_drop_seq (wr), whcst, deferred_free_list);
/* when transitioning from >= low-water to < low-water, signal
anyone waiting in throttle_writer() */
n_unacked = whc_unacked_bytes (wr->whc);
if (wr->throttling && n_unacked <= wr->whc_low)
if (wr->throttling && whcst->unacked_bytes <= wr->whc_low)
os_condBroadcast (&wr->throttle_cond);
if (wr->retransmitting && whc_unacked_bytes (wr->whc) == 0)
if (wr->retransmitting && whcst->unacked_bytes == 0)
writer_clear_retransmitting (wr);
if (wr->state == WRST_LINGERING && n_unacked == 0)
if (wr->state == WRST_LINGERING && whcst->unacked_bytes == 0)
{
nn_log (LC_DISCOVERY, "remove_acked_messages: deleting lingering writer %x:%x:%x:%x\n", PGUID (wr->e.guid));
delete_writer_nolinger_locked (wr);
@ -2572,26 +2555,8 @@ unsigned remove_acked_messages (struct writer *wr, struct whc_node **deferred_fr
return n;
}
unsigned remove_acked_messages_and_free (struct writer *wr)
static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void * status_entity)
{
struct whc_node *deferred_free_list;
unsigned n = remove_acked_messages (wr, &deferred_free_list);
whc_free_deferred_free_list (wr->whc, deferred_free_list);
return n;
}
static struct writer * new_writer_guid
(
const struct nn_guid *guid,
const struct nn_guid *group_guid,
struct participant *pp,
const struct sertopic *topic,
const struct nn_xqos *xqos,
status_cb_t status_cb,
void * status_entity
)
{
const size_t sample_overhead = 80; /* INFO_TS + DATA (approximate figure) + inline QoS */
struct writer *wr;
nn_mtime_t tnow = now_mt ();
@ -2782,32 +2747,8 @@ static struct writer * new_writer_guid
}
wr->lease_duration = T_NEVER; /* FIXME */
/* Construct WHC -- if aggressive_keep_last1 is set, the WHC will
drop all samples for which a later update is available. This
forces it to maintain a tlidx. */
{
unsigned hdepth, tldepth;
if (!wr->aggressive_keep_last || wr->xqos->history.kind == NN_KEEP_ALL_HISTORY_QOS)
hdepth = 0;
else
hdepth = (unsigned)wr->xqos->history.depth;
if (wr->handle_as_transient_local) {
if (wr->xqos->durability_service.history.kind == NN_KEEP_ALL_HISTORY_QOS)
tldepth = 0;
else
tldepth = (unsigned)wr->xqos->durability_service.history.depth;
} else if (wr->startup_mode) {
tldepth = hdepth;
} else {
tldepth = 0;
}
if (hdepth == 0 && tldepth == 0)
{
/* no index at all - so no need to bother with startup mode */
wr->startup_mode = 0;
}
wr->whc = whc_new (wr->handle_as_transient_local, hdepth, tldepth, sample_overhead);
if (hdepth > 0)
wr->whc = whc;
if (wr->xqos->history.kind == NN_KEEP_LAST_HISTORY_QOS && wr->aggressive_keep_last)
{
/* hdepth > 0 => "aggressive keep last", and in that case: why
bother blocking for a slow receiver when the entire point of
@ -2821,7 +2762,6 @@ static struct writer * new_writer_guid
wr->whc_high = config.whc_init_highwater_mark.value;
}
assert (!is_builtin_entityid(wr->e.guid.entityid, ownvendorid) || (wr->whc_low == wr->whc_high && wr->whc_low == INT32_MAX));
}
/* Connection admin */
ut_avlInit (&wr_readers_treedef, &wr->readers);
@ -2855,16 +2795,7 @@ static struct writer * new_writer_guid
return wr;
}
struct writer * new_writer
(
struct nn_guid *wrguid,
const struct nn_guid *group_guid,
const struct nn_guid *ppguid,
const struct sertopic *topic,
const struct nn_xqos *xqos,
status_cb_t status_cb,
void * status_cb_arg
)
struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg)
{
struct participant *pp;
struct writer * wr;
@ -2882,7 +2813,7 @@ struct writer * new_writer
wrguid->prefix = pp->e.guid.prefix;
if (pp_allocate_entityid (&wrguid->entityid, entity_kind, pp) < 0)
return NULL;
wr = new_writer_guid (wrguid, group_guid, pp, topic, xqos, status_cb, status_cb_arg);
wr = new_writer_guid (wrguid, group_guid, pp, topic, xqos, whc, status_cb, status_cb_arg);
return wr;
}
@ -2929,7 +2860,7 @@ static void gc_delete_writer (struct gcreq *gcreq)
(wr->status_cb) (wr->status_cb_entity, NULL);
}
whc_free (wr->whc);
wr->whc->ops->free (wr->whc);
#ifdef DDSI_INCLUDE_SSM
if (wr->ssm_as)
unref_addrset (wr->ssm_as);
@ -3012,6 +2943,7 @@ int delete_writer_nolinger (const struct nn_guid *guid)
int delete_writer (const struct nn_guid *guid)
{
struct writer *wr;
struct whc_state whcst;
if ((wr = ephash_lookup_writer_guid (guid)) == NULL)
{
nn_log (LC_DISCOVERY, "delete_writer(guid %x:%x:%x:%x) - unknown guid\n", PGUID (*guid));
@ -3024,7 +2956,8 @@ int delete_writer (const struct nn_guid *guid)
be the usual case), do it immediately. If more data is still
coming in (which can't really happen at the moment, but might
again in the future) it'll potentially be discarded. */
if (whc_unacked_bytes (wr->whc) == 0)
wr->whc->ops->get_state(wr->whc, &whcst);
if (whcst.unacked_bytes == 0)
{
nn_log (LC_DISCOVERY, "delete_writer(guid %x:%x:%x:%x) - no unack'ed samples\n", PGUID (*guid));
delete_writer_nolinger_locked (wr);
@ -3047,22 +2980,20 @@ int delete_writer (const struct nn_guid *guid)
void writer_exit_startup_mode (struct writer *wr)
{
struct whc_node *deferred_free_list = NULL;
os_mutexLock (&wr->e.lock);
/* startup mode and handle_as_transient_local may not both be set */
assert (!(wr->startup_mode && wr->handle_as_transient_local));
if (!wr->startup_mode)
nn_log (LC_DISCOVERY, " wr %x:%x:%x:%x skipped\n", PGUID (wr->e.guid));
else
if (wr->startup_mode)
{
unsigned n;
assert (wr->e.guid.entityid.u != NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER);
unsigned cnt = 0;
struct whc_state whcst;
wr->startup_mode = 0;
whc_downgrade_to_volatile (wr->whc);
n = remove_acked_messages_and_free (wr);
cnt += remove_acked_messages (wr, &whcst, &deferred_free_list);
cnt += wr->whc->ops->downgrade_to_volatile (wr->whc, &whcst);
writer_clear_retransmitting (wr);
nn_log (LC_DISCOVERY, " wr %x:%x:%x:%x dropped %u entr%s\n", PGUID (wr->e.guid), n, n == 1 ? "y" : "ies");
nn_log (LC_DISCOVERY, " %x:%x:%x:%x: dropped %u samples\n", PGUID(wr->e.guid), cnt);
}
os_mutexUnlock (&wr->e.lock);
wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list);
}
uint64_t writer_instance_id (const struct nn_guid *guid)
@ -4379,16 +4310,19 @@ static void proxy_reader_set_delete_and_ack_all_messages (struct proxy_reader *p
os_mutexUnlock (&prd->e.lock);
if ((wr = ephash_lookup_writer_guid (&wrguid)) != NULL)
{
struct whc_node *deferred_free_list = NULL;
struct wr_prd_match *m_wr;
os_mutexLock (&wr->e.lock);
if ((m_wr = ut_avlLookup (&wr_readers_treedef, &wr->readers, &prd->e.guid)) != NULL)
{
struct whc_state whcst;
m_wr->seq = MAX_SEQ_NUMBER;
ut_avlAugmentUpdate (&wr_readers_treedef, m_wr);
remove_acked_messages_and_free (wr);
(void)remove_acked_messages (wr, &whcst, &deferred_free_list);
writer_clear_retransmitting (wr);
}
os_mutexUnlock (&wr->e.lock);
wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list);
}
wrguid = wrguid_next;

View file

@ -38,7 +38,6 @@
#include "ddsi/q_ephash.h"
#include "ddsi/q_lease.h"
#include "ddsi/q_gc.h"
#include "ddsi/q_whc.h"
#include "ddsi/q_entity.h"
#include "ddsi/q_nwif.h"
#include "ddsi/q_globals.h"
@ -59,6 +58,7 @@
#include "ddsi/ddsi_mcgroup.h"
#include "dds__tkmap.h"
#include "dds__whc.h"
static void add_peer_addresses (struct addrset *as, const struct config_peer_listelem *list)
{

View file

@ -41,7 +41,6 @@
#include "ddsi/q_ephash.h"
#include "ddsi/q_lease.h"
#include "ddsi/q_gc.h"
#include "ddsi/q_whc.h"
#include "ddsi/q_entity.h"
#include "ddsi/q_xmsg.h"
#include "ddsi/q_receive.h"
@ -52,6 +51,7 @@
#include "ddsi/ddsi_mcgroup.h"
#include "ddsi/sysdeps.h"
#include "dds__whc.h"
/*
Notes:
@ -588,7 +588,7 @@ static int add_Gap (struct nn_xmsg *msg, struct writer *wr, struct proxy_reader
return 0;
}
static void force_heartbeat_to_peer (struct writer *wr, struct proxy_reader *prd, int hbansreq)
static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *whcst, struct proxy_reader *prd, int hbansreq)
{
struct nn_xmsg *m;
@ -603,7 +603,7 @@ static void force_heartbeat_to_peer (struct writer *wr, struct proxy_reader *prd
return;
}
if (whc_empty (wr->whc) && !config.respond_to_rti_init_zero_ack_with_invalid_heartbeat)
if (WHCST_ISEMPTY(whcst) && !config.respond_to_rti_init_zero_ack_with_invalid_heartbeat)
{
/* If WHC is empty, we send a Gap combined with a Heartbeat. The
Gap reuses the latest sequence number (or consumes a new one if
@ -622,14 +622,14 @@ static void force_heartbeat_to_peer (struct writer *wr, struct proxy_reader *prd
UPDATE_SEQ_XMIT_LOCKED(wr, 1);
}
add_Gap (m, wr, prd, seq, seq+1, 1, &bits);
add_Heartbeat (m, wr, hbansreq, prd->e.guid.entityid, 1);
add_Heartbeat (m, wr, whcst, hbansreq, prd->e.guid.entityid, 1);
TRACE (("force_heartbeat_to_peer: %x:%x:%x:%x -> %x:%x:%x:%x - whc empty, queueing gap #%"PRId64" + heartbeat for transmit\n",
PGUID (wr->e.guid), PGUID (prd->e.guid), seq));
}
else
{
/* Send a Heartbeat just to this peer */
add_Heartbeat (m, wr, hbansreq, prd->e.guid.entityid, 0);
add_Heartbeat (m, wr, whcst, hbansreq, prd->e.guid.entityid, 0);
TRACE (("force_heartbeat_to_peer: %x:%x:%x:%x -> %x:%x:%x:%x - queue for transmit\n",
PGUID (wr->e.guid), PGUID (prd->e.guid)));
}
@ -638,7 +638,7 @@ static void force_heartbeat_to_peer (struct writer *wr, struct proxy_reader *prd
static seqno_t grow_gap_to_next_seq (const struct writer *wr, seqno_t seq)
{
seqno_t next_seq = whc_next_seq (wr->whc, seq - 1);
seqno_t next_seq = wr->whc->ops->next_seq (wr->whc, seq - 1);
seqno_t seq_xmit = READ_SEQ_XMIT(wr);
if (next_seq == MAX_SEQ_NUMBER) /* no next sample */
return seq_xmit + 1;
@ -715,6 +715,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
uint32_t msgs_sent, msgs_lost;
seqno_t max_seq_in_reply;
struct whc_node *deferred_free_list = NULL;
struct whc_state whcst;
unsigned i;
int hb_sent_in_response = 0;
memset (gapbits, 0, sizeof (gapbits));
@ -828,19 +829,21 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
rn->seq = wr->seq;
}
ut_avlAugmentUpdate (&wr_readers_treedef, rn);
n = remove_acked_messages (wr, &deferred_free_list);
n = remove_acked_messages (wr, &whcst, &deferred_free_list);
TRACE ((" ACK%"PRId64" RM%u", n_ack, n));
}
else
{
/* There's actually no guarantee that we need this information */
wr->whc->ops->get_state(wr->whc, &whcst);
}
/* If this reader was marked as "non-responsive" in the past, it's now responding again,
so update its status */
if (rn->seq == MAX_SEQ_NUMBER && prd->c.xqos->reliability.kind == NN_RELIABLE_RELIABILITY_QOS)
{
seqno_t oldest_seq;
if (whc_empty (wr->whc))
oldest_seq = wr->seq;
else
oldest_seq = whc_max_seq (wr->whc);
oldest_seq = WHCST_ISEMPTY(&whcst) ? wr->seq : whcst.max_seq;
rn->has_replied_to_hb = 1; /* was temporarily cleared to ensure heartbeats went out */
rn->seq = seqbase - 1;
if (oldest_seq > rn->seq) {
@ -880,19 +883,19 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
data in our WHC, we start sending it regardless of whether the
remote reader asked for it */
TRACE ((" preemptive-nack"));
if (whc_empty (wr->whc))
if (WHCST_ISEMPTY(&whcst))
{
TRACE ((" whc-empty "));
force_heartbeat_to_peer (wr, prd, 0);
force_heartbeat_to_peer (wr, &whcst, prd, 0);
hb_sent_in_response = 1;
}
else
{
TRACE ((" rebase "));
force_heartbeat_to_peer (wr, prd, 0);
force_heartbeat_to_peer (wr, &whcst, prd, 0);
hb_sent_in_response = 1;
numbits = config.accelerate_rexmit_block_size;
seqbase = whc_min_seq (wr->whc);
seqbase = whcst.min_seq;
}
}
else if (!rn->assumed_in_sync)
@ -940,25 +943,25 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
if (i >= msg->readerSNState.numbits || nn_bitset_isset (numbits, msg->readerSNState.bits, i))
{
seqno_t seq = seqbase + i;
struct whc_node *whcn;
if ((whcn = whc_findseq (wr->whc, seq)) != NULL)
struct whc_borrowed_sample sample;
if (wr->whc->ops->borrow_sample (wr->whc, seq, &sample))
{
if (!wr->retransmitting && whcn->unacked)
if (!wr->retransmitting && sample.unacked)
writer_set_retransmitting (wr);
if (config.retransmit_merging != REXMIT_MERGE_NEVER && rn->assumed_in_sync)
{
/* send retransmit to all receivers, but skip if recently done */
nn_mtime_t tstamp = now_mt ();
if (tstamp.v > whcn->last_rexmit_ts.v + config.retransmit_merging_period)
if (tstamp.v > sample.last_rexmit_ts.v + config.retransmit_merging_period)
{
TRACE ((" RX%"PRId64, seqbase + i));
enqueued = (enqueue_sample_wrlock_held (wr, seq, whcn->plist, whcn->serdata, NULL, 0) >= 0);
enqueued = (enqueue_sample_wrlock_held (wr, seq, sample.plist, sample.serdata, NULL, 0) >= 0);
if (enqueued)
{
max_seq_in_reply = seqbase + i;
msgs_sent++;
whcn->last_rexmit_ts = tstamp;
sample.last_rexmit_ts = tstamp;
}
}
else
@ -970,14 +973,16 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
{
/* no merging, send directed retransmit */
TRACE ((" RX%"PRId64"", seqbase + i));
enqueued = (enqueue_sample_wrlock_held (wr, seq, whcn->plist, whcn->serdata, prd, 0) >= 0);
enqueued = (enqueue_sample_wrlock_held (wr, seq, sample.plist, sample.serdata, prd, 0) >= 0);
if (enqueued)
{
max_seq_in_reply = seqbase + i;
msgs_sent++;
whcn->rexmit_count++;
sample.rexmit_count++;
}
}
wr->whc->ops->return_sample(wr->whc, &sample, true);
}
else if (gapstart == -1)
{
@ -1054,7 +1059,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
if (msgs_sent && max_seq_in_reply < seq_xmit)
{
TRACE ((" rexmit#%"PRIu32" maxseq:%"PRId64"<%"PRId64"<=%"PRId64"", msgs_sent, max_seq_in_reply, seq_xmit, wr->seq));
force_heartbeat_to_peer (wr, prd, 1);
force_heartbeat_to_peer (wr, &whcst, prd, 1);
hb_sent_in_response = 1;
/* The primary purpose of hbcontrol_note_asyncwrite is to ensure
@ -1062,16 +1067,16 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
gradually lowering rate. If we just got a request for a
retransmit, and there is more to be retransmitted, surely the
rate should be kept up for now */
writer_hbcontrol_note_asyncwrite (wr, now_mt ());
writer_hbcontrol_note_asyncwrite (wr, &whcst, now_mt ());
}
/* If "final" flag not set, we must respond with a heartbeat. Do it
now if we haven't done so already */
if (!(msg->smhdr.flags & ACKNACK_FLAG_FINAL) && !hb_sent_in_response)
force_heartbeat_to_peer (wr, prd, 0);
force_heartbeat_to_peer (wr, &whcst, prd, 0);
TRACE ((")"));
out:
os_mutexUnlock (&wr->e.lock);
whc_free_deferred_free_list (wr->whc, deferred_free_list);
wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list);
return 1;
}
@ -1428,7 +1433,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
struct proxy_reader *prd;
struct wr_prd_match *rn;
struct writer *wr;
struct whc_node *whcn;
struct whc_borrowed_sample sample;
nn_guid_t src, dst;
nn_count_t *countp;
seqno_t seq = fromSN (msg->writerSN);
@ -1493,7 +1498,25 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
/* Resend the requested fragments if we still have the sample, send
a Gap if we don't have them anymore. */
if ((whcn = whc_findseq (wr->whc, seq)) == NULL)
if (wr->whc->ops->borrow_sample (wr->whc, seq, &sample))
{
const unsigned base = msg->fragmentNumberState.bitmap_base - 1;
int enqueued = 1;
TRACE ((" scheduling requested frags ...\n"));
for (i = 0; i < msg->fragmentNumberState.numbits && enqueued; i++)
{
if (nn_bitset_isset (msg->fragmentNumberState.numbits, msg->fragmentNumberState.bits, i))
{
struct nn_xmsg *reply;
if (create_fragment_message (wr, seq, sample.plist, sample.serdata, base + i, prd, &reply, 0) < 0)
enqueued = 0;
else
enqueued = qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0);
}
}
wr->whc->ops->return_sample (wr->whc, &sample, false);
}
else
{
static unsigned zero = 0;
struct nn_xmsg *m;
@ -1512,30 +1535,15 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
qxev_msg (wr->evq, m);
}
}
else
{
const unsigned base = msg->fragmentNumberState.bitmap_base - 1;
int enqueued = 1;
TRACE ((" scheduling requested frags ...\n"));
for (i = 0; i < msg->fragmentNumberState.numbits && enqueued; i++)
{
if (nn_bitset_isset (msg->fragmentNumberState.numbits, msg->fragmentNumberState.bits, i))
{
struct nn_xmsg *reply;
if (create_fragment_message (wr, seq, whcn->plist, whcn->serdata, base + i, prd, &reply, 0) < 0)
enqueued = 0;
else
enqueued = qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0);
}
}
}
if (seq < READ_SEQ_XMIT(wr))
{
/* Not everything was retransmitted yet, so force a heartbeat out
to give the reader a chance to nack the rest and make sure
hearbeats will go out at a reasonably high rate for a while */
force_heartbeat_to_peer (wr, prd, 1);
writer_hbcontrol_note_asyncwrite (wr, now_mt ());
struct whc_state whcst;
wr->whc->ops->get_state(wr->whc, &whcst);
force_heartbeat_to_peer (wr, &whcst, prd, 1);
writer_hbcontrol_note_asyncwrite (wr, &whcst, now_mt ());
}
out:

View file

@ -15,7 +15,6 @@
#include "os/os.h"
#include "util/ut_avl.h"
#include "ddsi/q_whc.h"
#include "ddsi/q_entity.h"
#include "ddsi/q_addrset.h"
#include "ddsi/q_xmsg.h"
@ -36,6 +35,7 @@
#include "ddsi/ddsi_ser.h"
#include "ddsi/sysdeps.h"
#include "dds__whc.h"
#if __STDC_VERSION__ >= 199901L
#define POS_INFINITY_DOUBLE INFINITY
@ -84,7 +84,7 @@ static void writer_hbcontrol_note_hb (struct writer *wr, nn_mtime_t tnow, int an
hbc->hbs_since_last_write++;
}
int64_t writer_hbcontrol_intv (const struct writer *wr, UNUSED_ARG (nn_mtime_t tnow))
int64_t writer_hbcontrol_intv (const struct writer *wr, const struct whc_state *whcst, UNUSED_ARG (nn_mtime_t tnow))
{
struct hbcontrol const * const hbc = &wr->hbcontrol;
int64_t ret = config.const_hb_intv_sched;
@ -97,7 +97,7 @@ int64_t writer_hbcontrol_intv (const struct writer *wr, UNUSED_ARG (nn_mtime_t t
ret *= 2;
}
n_unacked = whc_unacked_bytes (wr->whc);
n_unacked = whcst->unacked_bytes;
if (n_unacked >= wr->whc_low + 3 * (wr->whc_high - wr->whc_low) / 4)
ret /= 2;
if (n_unacked >= wr->whc_low + (wr->whc_high - wr->whc_low) / 2)
@ -109,7 +109,7 @@ int64_t writer_hbcontrol_intv (const struct writer *wr, UNUSED_ARG (nn_mtime_t t
return ret;
}
void writer_hbcontrol_note_asyncwrite (struct writer *wr, nn_mtime_t tnow)
void writer_hbcontrol_note_asyncwrite (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow)
{
struct hbcontrol * const hbc = &wr->hbcontrol;
nn_mtime_t tnext;
@ -131,13 +131,13 @@ void writer_hbcontrol_note_asyncwrite (struct writer *wr, nn_mtime_t tnow)
}
}
int writer_hbcontrol_must_send (const struct writer *wr, nn_mtime_t tnow /* monotonic */)
int writer_hbcontrol_must_send (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow /* monotonic */)
{
struct hbcontrol const * const hbc = &wr->hbcontrol;
return (tnow.v >= hbc->t_of_last_hb.v + writer_hbcontrol_intv (wr, tnow));
return (tnow.v >= hbc->t_of_last_hb.v + writer_hbcontrol_intv (wr, whcst, tnow));
}
struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, nn_mtime_t tnow, int hbansreq, int issync)
struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow, int hbansreq, int issync)
{
struct nn_xmsg *msg;
const nn_guid_t *prd_guid;
@ -200,7 +200,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, nn_mtime_t
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id);
#endif
add_Heartbeat (msg, wr, hbansreq, to_entityid (NN_ENTITYID_UNKNOWN), issync);
add_Heartbeat (msg, wr, whcst, hbansreq, to_entityid (NN_ENTITYID_UNKNOWN), issync);
}
else
{
@ -221,14 +221,14 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, nn_mtime_t
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id);
#endif
add_Heartbeat (msg, wr, hbansreq, prd_guid->entityid, issync);
add_Heartbeat (msg, wr, whcst, hbansreq, prd_guid->entityid, issync);
}
writer_hbcontrol_note_hb (wr, tnow, hbansreq);
return msg;
}
static int writer_hbcontrol_ack_required_generic (const struct writer *wr, nn_mtime_t tlast, nn_mtime_t tnow, int piggyback)
static int writer_hbcontrol_ack_required_generic (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tlast, nn_mtime_t tnow, int piggyback)
{
struct hbcontrol const * const hbc = &wr->hbcontrol;
const int64_t hb_intv_ack = config.const_hb_intv_sched;
@ -251,7 +251,7 @@ static int writer_hbcontrol_ack_required_generic (const struct writer *wr, nn_mt
return 2;
}
if (whc_unacked_bytes (wr->whc) >= wr->whc_low + (wr->whc_high - wr->whc_low) / 2)
if (whcst->unacked_bytes >= wr->whc_low + (wr->whc_high - wr->whc_low) / 2)
{
if (tnow.v >= hbc->t_of_last_ackhb.v + config.const_hb_intv_sched_min)
return 2;
@ -262,13 +262,13 @@ static int writer_hbcontrol_ack_required_generic (const struct writer *wr, nn_mt
return 0;
}
int writer_hbcontrol_ack_required (const struct writer *wr, nn_mtime_t tnow)
int writer_hbcontrol_ack_required (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow)
{
struct hbcontrol const * const hbc = &wr->hbcontrol;
return writer_hbcontrol_ack_required_generic (wr, hbc->t_of_last_write, tnow, 0);
return writer_hbcontrol_ack_required_generic (wr, whcst, hbc->t_of_last_write, tnow, 0);
}
struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, nn_mtime_t tnow, unsigned packetid, int *hbansreq)
struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow, unsigned packetid, int *hbansreq)
{
struct hbcontrol * const hbc = &wr->hbcontrol;
unsigned last_packetid;
@ -284,20 +284,20 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, nn_mtime_t tnow,
/* Update statistics, intervals, scheduling of heartbeat event,
&c. -- there's no real difference between async and sync so we
reuse the async version. */
writer_hbcontrol_note_asyncwrite (wr, tnow);
writer_hbcontrol_note_asyncwrite (wr, whcst, tnow);
*hbansreq = writer_hbcontrol_ack_required_generic (wr, tlast, tnow, 1);
*hbansreq = writer_hbcontrol_ack_required_generic (wr, whcst, tlast, tnow, 1);
if (*hbansreq >= 2) {
/* So we force a heartbeat in - but we also rely on our caller to
send the packet out */
msg = writer_hbcontrol_create_heartbeat (wr, tnow, *hbansreq, 1);
msg = writer_hbcontrol_create_heartbeat (wr, whcst, tnow, *hbansreq, 1);
} else if (last_packetid != packetid) {
/* If we crossed a packet boundary since the previous write,
piggyback a heartbeat, with *hbansreq determining whether or
not an ACK is needed. We don't force the packet out either:
this is just to ensure a regular flow of ACKs for cleaning up
the WHC & for allowing readers to NACK missing samples. */
msg = writer_hbcontrol_create_heartbeat (wr, tnow, *hbansreq, 1);
msg = writer_hbcontrol_create_heartbeat (wr, whcst, tnow, *hbansreq, 1);
} else {
*hbansreq = 0;
msg = NULL;
@ -311,13 +311,13 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, nn_mtime_t tnow,
(hbc->tsched.v == T_NEVER) ? POS_INFINITY_DOUBLE : (double) (hbc->tsched.v - tnow.v) / 1e9,
ut_avlIsEmpty (&wr->readers) ? -1 : root_rdmatch (wr)->min_seq,
ut_avlIsEmpty (&wr->readers) || root_rdmatch (wr)->all_have_replied_to_hb ? "" : "!",
whc_empty (wr->whc) ? -1 : whc_max_seq (wr->whc), READ_SEQ_XMIT(wr)));
whcst->max_seq, READ_SEQ_XMIT(wr)));
}
return msg;
}
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, int hbansreq, nn_entityid_t dst, int issync)
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, nn_entityid_t dst, int issync)
{
struct nn_xmsg_marker sm_marker;
Heartbeat_t * hb;
@ -343,7 +343,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, int hbansreq, nn_ent
hb->readerId = nn_hton_entityid (dst);
hb->writerId = nn_hton_entityid (wr->e.guid.entityid);
if (whc_empty (wr->whc))
if (WHCST_ISEMPTY(whcst))
{
/* Really don't have data. Fake one at the current wr->seq.
We're not really allowed to generate heartbeats when the WHC is
@ -360,7 +360,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, int hbansreq, nn_ent
else
{
seqno_t seq_xmit;
min = whc_min_seq (wr->whc);
min = whcst->min_seq;
max = wr->seq;
seq_xmit = READ_SEQ_XMIT(wr);
assert (min <= max);
@ -669,7 +669,7 @@ static int must_skip_frag (const char *frags_to_skip, unsigned frag)
}
#endif
static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *wr, seqno_t seq, const struct nn_plist *plist, serdata_t serdata, struct proxy_reader *prd, int isnew, unsigned nfrags)
static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *wr, const struct whc_state *whcst, seqno_t seq, const struct nn_plist *plist, serdata_t serdata, struct proxy_reader *prd, int isnew, unsigned nfrags)
{
unsigned i;
#if 0
@ -714,8 +714,7 @@ static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *
struct nn_xmsg *msg = NULL;
int hbansreq;
os_mutexLock (&wr->e.lock);
msg = writer_hbcontrol_piggyback
(wr, ddsi_serdata_twrite (serdata), nn_xpack_packetid (xp), &hbansreq);
msg = writer_hbcontrol_piggyback (wr, whcst, ddsi_serdata_twrite (serdata), nn_xpack_packetid (xp), &hbansreq);
os_mutexUnlock (&wr->e.lock);
if (msg)
{
@ -726,7 +725,7 @@ static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *
}
}
static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr, seqno_t seq, const struct nn_plist *plist, serdata_t serdata, struct proxy_reader *prd, int isnew)
static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr, const struct whc_state *whcst, seqno_t seq, const struct nn_plist *plist, serdata_t serdata, struct proxy_reader *prd, int isnew)
{
/* on entry: &wr->e.lock held; on exit: lock no longer held */
struct nn_xmsg *fmsg;
@ -739,7 +738,7 @@ static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr,
unsigned nfrags;
os_mutexUnlock (&wr->e.lock);
nfrags = (sz + config.fragment_size - 1) / config.fragment_size;
transmit_sample_lgmsg_unlocked (xp, wr, seq, plist, serdata, prd, isnew, nfrags);
transmit_sample_lgmsg_unlocked (xp, wr, whcst, seq, plist, serdata, prd, isnew, nfrags);
return;
}
else if (create_fragment_message_simple (wr, seq, serdata, &fmsg) < 0)
@ -754,7 +753,7 @@ static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr,
/* Note: wr->heartbeat_xevent != NULL <=> wr is reliable */
if (wr->heartbeat_xevent)
hmsg = writer_hbcontrol_piggyback (wr, ddsi_serdata_twrite (serdata), nn_xpack_packetid (xp), &hbansreq);
hmsg = writer_hbcontrol_piggyback (wr, whcst, ddsi_serdata_twrite (serdata), nn_xpack_packetid (xp), &hbansreq);
else
hmsg = NULL;
@ -855,7 +854,7 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist
if (!do_insert)
res = 0;
else if ((insres = whc_insert (wr->whc, writer_max_drop_seq (wr), seq, plist, serdata, tk)) < 0)
else if ((insres = wr->whc->ops->insert (wr->whc, writer_max_drop_seq (wr), seq, plist, serdata, tk)) < 0)
res = insres;
else
res = 1;
@ -863,16 +862,18 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist
#ifndef NDEBUG
if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER)
{
if (whc_findmax (wr->whc) == NULL)
struct whc_state whcst;
wr->whc->ops->get_state(wr->whc, &whcst);
if (WHCST_ISEMPTY(&whcst))
assert (wr->c.pp->builtins_deleted);
}
#endif
return res;
}
static int writer_may_continue (const struct writer *wr)
static int writer_may_continue (const struct writer *wr, const struct whc_state *whcst)
{
return (whc_unacked_bytes (wr->whc) <= wr->whc_low && !wr->retransmitting) || (wr->state != WRST_OPERATIONAL);
return (whcst->unacked_bytes <= wr->whc_low && !wr->retransmitting) || (wr->state != WRST_OPERATIONAL);
}
@ -915,7 +916,8 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr)
os_result result = os_resultSuccess;
nn_mtime_t tnow = now_mt ();
const nn_mtime_t abstimeout = add_duration_to_mtime (tnow, nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time));
size_t n_unacked = whc_unacked_bytes (wr->whc);
struct whc_state whcst;
wr->whc->ops->get_state(wr->whc, &whcst);
{
nn_vendorid_t ownvendorid = MY_VENDOR_ID;
@ -925,7 +927,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr)
assert (!is_builtin_entityid(wr->e.guid.entityid, ownvendorid));
}
nn_log (LC_THROTTLE, "writer %x:%x:%x:%x waiting for whc to shrink below low-water mark (whc %"PRIuSIZE" low=%u high=%u)\n", PGUID (wr->e.guid), n_unacked, wr->whc_low, wr->whc_high);
nn_log (LC_THROTTLE, "writer %x:%x:%x:%x waiting for whc to shrink below low-water mark (whc %"PRIuSIZE" low=%u high=%u)\n", PGUID (wr->e.guid), whcst.unacked_bytes, wr->whc_low, wr->whc_high);
wr->throttling = 1;
wr->throttle_count++;
@ -934,7 +936,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr)
things the wrong way round ... */
if (xp)
{
struct nn_xmsg *hbmsg = writer_hbcontrol_create_heartbeat (wr, tnow, 1, 1);
struct nn_xmsg *hbmsg = writer_hbcontrol_create_heartbeat (wr, &whcst, tnow, 1, 1);
os_mutexUnlock (&wr->e.lock);
if (hbmsg)
{
@ -944,7 +946,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr)
os_mutexLock (&wr->e.lock);
}
while (gv.rtps_keepgoing && !writer_may_continue (wr))
while (gv.rtps_keepgoing && !writer_may_continue (wr, &whcst))
{
int64_t reltimeout;
tnow = now_mt ();
@ -958,6 +960,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr)
thread_state_asleep (lookup_thread_state());
result = os_condTimedWait (&wr->throttle_cond, &wr->e.lock, &timeout);
thread_state_awake (lookup_thread_state());
wr->whc->ops->get_state(wr->whc, &whcst);
}
if (result == os_resultTimeout)
{
@ -972,8 +975,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr)
os_condBroadcast (&wr->throttle_cond);
}
n_unacked = whc_unacked_bytes (wr->whc);
nn_log (LC_THROTTLE, "writer %x:%x:%x:%x done waiting for whc to shrink below low-water mark (whc %"PRIuSIZE" low=%u high=%u)\n", PGUID (wr->e.guid), n_unacked, wr->whc_low, wr->whc_high);
nn_log (LC_THROTTLE, "writer %x:%x:%x:%x done waiting for whc to shrink below low-water mark (whc %"PRIuSIZE" low=%u high=%u)\n", PGUID (wr->e.guid), whcst.unacked_bytes, wr->whc_low, wr->whc_high);
return result;
}
@ -1028,8 +1030,9 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p
/* If WHC overfull, block. */
{
size_t unacked_bytes = whc_unacked_bytes (wr->whc);
if (unacked_bytes > wr->whc_high)
struct whc_state whcst;
wr->whc->ops->get_state(wr->whc, &whcst);
if (whcst.unacked_bytes > wr->whc_high)
{
os_result ores;
assert(gc_allowed); /* also see beginning of the function */
@ -1038,7 +1041,7 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p
else
{
maybe_grow_whc (wr);
if (unacked_bytes <= wr->whc_high)
if (whcst.unacked_bytes <= wr->whc_high)
ores = os_resultSuccess;
else
ores = throttle_writer (xp, wr);
@ -1081,6 +1084,10 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p
}
else
{
struct whc_state whcst;
if (wr->heartbeat_xevent)
wr->whc->ops->get_state(wr->whc, &whcst);
/* Note the subtlety of enqueueing with the lock held but
transmitting without holding the lock. Still working on
cleaning that up. */
@ -1098,14 +1105,14 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p
plist_copy = &plist_stk;
nn_plist_copy (plist_copy, plist);
}
transmit_sample_unlocks_wr (xp, wr, seq, plist_copy, serdata, NULL, 1);
transmit_sample_unlocks_wr (xp, wr, &whcst, seq, plist_copy, serdata, NULL, 1);
if (plist_copy)
nn_plist_fini (plist_copy);
}
else
{
if (wr->heartbeat_xevent)
writer_hbcontrol_note_asyncwrite (wr, tnow);
writer_hbcontrol_note_asyncwrite (wr, &whcst, tnow);
enqueue_sample_wrlock_held (wr, seq, plist, serdata, NULL, 1);
os_mutexUnlock (&wr->e.lock);
}

View file

@ -21,7 +21,6 @@
#include "ddsi/q_log.h"
#include "ddsi/q_addrset.h"
#include "ddsi/q_xmsg.h"
#include "ddsi/q_whc.h"
#include "ddsi/q_xevent.h"
#include "ddsi/q_thread.h"
#include "ddsi/q_config.h"
@ -39,6 +38,7 @@
#include "ddsi/q_xmsg.h"
#include "q__osplser.h"
#include "ddsi/ddsi_ser.h"
#include "dds__whc.h"
#include "ddsi/sysdeps.h"
@ -596,6 +596,7 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt
struct writer *wr;
nn_mtime_t t_next;
int hbansreq = 0;
struct whc_state whcst;
if ((wr = ephash_lookup_writer_guid (&ev->u.heartbeat.wr_guid)) == NULL)
{
@ -606,23 +607,24 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt
assert (wr->reliable);
os_mutexLock (&wr->e.lock);
if (!writer_must_have_hb_scheduled (wr))
wr->whc->ops->get_state(wr->whc, &whcst);
if (!writer_must_have_hb_scheduled (wr, &whcst))
{
hbansreq = 1; /* just for trace */
msg = NULL; /* Need not send it now, and no need to schedule it for the future */
t_next.v = T_NEVER;
}
else if (!writer_hbcontrol_must_send (wr, tnow))
else if (!writer_hbcontrol_must_send (wr, &whcst, tnow))
{
hbansreq = 1; /* just for trace */
msg = NULL;
t_next.v = tnow.v + writer_hbcontrol_intv (wr, tnow);
t_next.v = tnow.v + writer_hbcontrol_intv (wr, &whcst, tnow);
}
else
{
hbansreq = writer_hbcontrol_ack_required (wr, tnow);
msg = writer_hbcontrol_create_heartbeat (wr, tnow, hbansreq, 0);
t_next.v = tnow.v + writer_hbcontrol_intv (wr, tnow);
hbansreq = writer_hbcontrol_ack_required (wr, &whcst, tnow);
msg = writer_hbcontrol_create_heartbeat (wr, &whcst, tnow, hbansreq, 0);
t_next.v = tnow.v + writer_hbcontrol_intv (wr, &whcst, tnow);
}
TRACE (("heartbeat(wr %x:%x:%x:%x%s) %s, resched in %g s (min-ack %"PRId64"%s, avail-seq %"PRId64", xmit %"PRId64")\n",
@ -632,7 +634,7 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt
(t_next.v == T_NEVER) ? POS_INFINITY_DOUBLE : (double)(t_next.v - tnow.v) / 1e9,
ut_avlIsEmpty (&wr->readers) ? (seqno_t) -1 : ((struct wr_prd_match *) ut_avlRootNonEmpty (&wr_readers_treedef, &wr->readers))->min_seq,
ut_avlIsEmpty (&wr->readers) || ((struct wr_prd_match *) ut_avlRootNonEmpty (&wr_readers_treedef, &wr->readers))->all_have_replied_to_hb ? "" : "!",
whc_empty (wr->whc) ? (seqno_t) -1 : whc_max_seq (wr->whc), READ_SEQ_XMIT(wr)));
whcst.max_seq, READ_SEQ_XMIT(wr)));
resched_xevent_if_earlier (ev, t_next);
wr->hbcontrol.tsched = t_next;
os_mutexUnlock (&wr->e.lock);
@ -963,10 +965,13 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
struct participant *pp;
struct proxy_reader *prd;
struct writer *spdp_wr;
struct whc_node *whcn;
struct whc_borrowed_sample sample;
serstate_t st;
serdata_t sd;
nn_guid_t kh;
#ifndef NDEBUG
bool sample_found;
#endif
if ((pp = ephash_lookup_participant_guid (&ev->u.spdp.pp_guid)) == NULL)
{
@ -1011,21 +1016,31 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
sd = ddsi_serstate_fix (st);
os_mutexLock (&spdp_wr->e.lock);
if ((whcn = whc_findkey (spdp_wr->whc, sd)) != NULL)
if (spdp_wr->whc->ops->borrow_sample_key (spdp_wr->whc, sd, &sample))
{
/* Claiming it is new rather than a retransmit so that the rexmit
limiting won't kick in. It is best-effort and therefore the
updating of the last transmitted sequence number won't take
place anyway. Nor is it necessary to fiddle with heartbeat
control stuff. */
enqueue_sample_wrlock_held (spdp_wr, whcn->seq, whcn->plist, whcn->serdata, prd, 1);
enqueue_sample_wrlock_held (spdp_wr, sample.seq, sample.plist, sample.serdata, prd, 1);
spdp_wr->whc->ops->return_sample(spdp_wr->whc, &sample, false);
#ifndef NDEBUG
sample_found = true;
#endif
}
#ifndef NDEBUG
else
{
sample_found = false;
}
#endif
os_mutexUnlock (&spdp_wr->e.lock);
ddsi_serdata_unref (sd);
#ifndef NDEBUG
if (whcn == NULL)
if (!sample_found)
{
/* If undirected, it is pp->spdp_xevent, and that one must never
run into an empty WHC unless it is already marked for deletion.