wrap indirect calls to WHC in inline functions

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2018-10-29 13:27:13 +08:00
parent 3e343d032a
commit f2f436bde3
9 changed files with 151 additions and 82 deletions

View file

@ -54,6 +54,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
q_time.c
q_transmit.c
q_inverse_uint32_set.c
q_whc.c
q_xevent.c
q_xmsg.c
q_freelist.c

View file

@ -42,8 +42,12 @@ struct whc_state {
an iter on the stack without specifying an implementation. If future changes or
implementations require more, these can be adjusted. An implementation should check
things fit at compile time. */
#define WHC_SAMPLE_ITER_SIZE (2*sizeof(void *))
#define WHC_SAMPLE_ITER_SIZE (1*sizeof(void *))
struct whc_sample_iter_base {
struct whc *whc;
};
struct whc_sample_iter {
struct whc_sample_iter_base c;
union {
char opaque[WHC_SAMPLE_ITER_SIZE];
/* cover alignment requirements: */
@ -90,6 +94,43 @@ struct whc {
const struct whc_ops *ops;
};
inline seqno_t whc_next_seq (const struct whc *whc, seqno_t seq) {
return whc->ops->next_seq (whc, seq);
}
inline void whc_get_state (const struct whc *whc, struct whc_state *st) {
whc->ops->get_state (whc, st);
}
inline bool whc_borrow_sample (const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample) {
return whc->ops->borrow_sample (whc, seq, sample);
}
inline bool whc_borrow_sample_key (const struct whc *whc, const struct ddsi_serdata *serdata_key, struct whc_borrowed_sample *sample) {
return whc->ops->borrow_sample_key (whc, serdata_key, sample);
}
inline void whc_return_sample (struct whc *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info) {
whc->ops->return_sample (whc, sample, update_retransmit_info);
}
inline void whc_sample_iter_init (const struct whc *whc, struct whc_sample_iter *it) {
whc->ops->sample_iter_init (whc, it);
}
inline bool whc_sample_iter_borrow_next (struct whc_sample_iter *it, struct whc_borrowed_sample *sample) {
return it->c.whc->ops->sample_iter_borrow_next (it, sample);
}
inline void whc_free (struct whc *whc) {
whc->ops->free (whc);
}
inline int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct tkmap_instance *tk) {
return whc->ops->insert (whc, max_drop_seq, seq, plist, serdata, tk);
}
inline unsigned whc_downgrade_to_volatile (struct whc *whc, struct whc_state *st) {
return whc->ops->downgrade_to_volatile (whc, st);
}
inline unsigned whc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list) {
return whc->ops->remove_acked_messages (whc, max_drop_seq, whcst, deferred_free_list);
}
inline void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list) {
whc->ops->free_deferred_free_list (whc, deferred_free_list);
}
#if defined (__cplusplus)
}
#endif

View file

@ -188,7 +188,7 @@ static int print_participants (struct thread_state1 *self, ddsi_tran_conn_t conn
continue;
os_mutexLock (&w->e.lock);
print_endpoint_common (conn, "wr", &w->e, &w->c, w->xqos, w->topic);
w->whc->ops->get_state(w->whc, &whcst);
whc_get_state(w->whc, &whcst);
x += cpf (conn, " whc [%lld,%lld] unacked %"PRIuSIZE"%s [%u,%u] seq %lld seq_xmit %lld cs_seq %lld\n",
whcst.min_seq, whcst.max_seq, whcst.unacked_bytes,
w->throttling ? " THROTTLING" : "",

View file

@ -1314,7 +1314,7 @@ static void writer_drop_connection (const struct nn_guid * wr_guid, const struct
}
}
os_mutexUnlock (&wr->e.lock);
wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list);
whc_free_deferred_free_list (wr->whc, deferred_free_list);
free_wr_prd_match (m);
}
}
@ -1606,8 +1606,8 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd)
{
struct whc_sample_iter it;
struct whc_borrowed_sample sample;
wr->whc->ops->sample_iter_init(wr->whc, &it);
while (wr->whc->ops->sample_iter_borrow_next(&it, &sample))
whc_sample_iter_init(wr->whc, &it);
while (whc_sample_iter_borrow_next(&it, &sample))
{
struct proxy_writer_info pwr_info;
struct ddsi_serdata *payload = sample.serdata;
@ -2540,7 +2540,7 @@ unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, stru
unsigned n;
assert (wr->e.guid.entityid.u != NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER);
ASSERT_MUTEX_HELD (&wr->e.lock);
n = wr->whc->ops->remove_acked_messages (wr->whc, writer_max_drop_seq (wr), whcst, deferred_free_list);
n = whc_remove_acked_messages (wr->whc, writer_max_drop_seq (wr), whcst, deferred_free_list);
/* when transitioning from >= low-water to < low-water, signal
anyone waiting in throttle_writer() */
if (wr->throttling && whcst->unacked_bytes <= wr->whc_low)
@ -2854,7 +2854,7 @@ static void gc_delete_writer (struct gcreq *gcreq)
(wr->status_cb) (wr->status_cb_entity, NULL);
}
wr->whc->ops->free (wr->whc);
whc_free (wr->whc);
#ifdef DDSI_INCLUDE_SSM
if (wr->ssm_as)
unref_addrset (wr->ssm_as);
@ -2950,7 +2950,7 @@ int delete_writer (const struct nn_guid *guid)
be the usual case), do it immediately. If more data is still
coming in (which can't really happen at the moment, but might
again in the future) it'll potentially be discarded. */
wr->whc->ops->get_state(wr->whc, &whcst);
whc_get_state(wr->whc, &whcst);
if (whcst.unacked_bytes == 0)
{
nn_log (LC_DISCOVERY, "delete_writer(guid %x:%x:%x:%x) - no unack'ed samples\n", PGUID (*guid));
@ -2982,12 +2982,12 @@ void writer_exit_startup_mode (struct writer *wr)
struct whc_state whcst;
wr->startup_mode = 0;
cnt += remove_acked_messages (wr, &whcst, &deferred_free_list);
cnt += wr->whc->ops->downgrade_to_volatile (wr->whc, &whcst);
cnt += whc_downgrade_to_volatile (wr->whc, &whcst);
writer_clear_retransmitting (wr);
nn_log (LC_DISCOVERY, " %x:%x:%x:%x: dropped %u samples\n", PGUID(wr->e.guid), cnt);
}
os_mutexUnlock (&wr->e.lock);
wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list);
whc_free_deferred_free_list (wr->whc, deferred_free_list);
}
uint64_t writer_instance_id (const struct nn_guid *guid)
@ -4316,7 +4316,7 @@ static void proxy_reader_set_delete_and_ack_all_messages (struct proxy_reader *p
writer_clear_retransmitting (wr);
}
os_mutexUnlock (&wr->e.lock);
wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list);
whc_free_deferred_free_list (wr->whc, deferred_free_list);
}
wrguid = wrguid_next;

View file

@ -638,7 +638,7 @@ static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *
static seqno_t grow_gap_to_next_seq (const struct writer *wr, seqno_t seq)
{
seqno_t next_seq = wr->whc->ops->next_seq (wr->whc, seq - 1);
seqno_t next_seq = whc_next_seq (wr->whc, seq - 1);
seqno_t seq_xmit = READ_SEQ_XMIT(wr);
if (next_seq == MAX_SEQ_NUMBER) /* no next sample */
return seq_xmit + 1;
@ -835,7 +835,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
else
{
/* There's actually no guarantee that we need this information */
wr->whc->ops->get_state(wr->whc, &whcst);
whc_get_state(wr->whc, &whcst);
}
/* If this reader was marked as "non-responsive" in the past, it's now responding again,
@ -944,7 +944,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
{
seqno_t seq = seqbase + i;
struct whc_borrowed_sample sample;
if (wr->whc->ops->borrow_sample (wr->whc, seq, &sample))
if (whc_borrow_sample (wr->whc, seq, &sample))
{
if (!wr->retransmitting && sample.unacked)
writer_set_retransmitting (wr);
@ -982,7 +982,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
}
}
wr->whc->ops->return_sample(wr->whc, &sample, true);
whc_return_sample(wr->whc, &sample, true);
}
else if (gapstart == -1)
{
@ -1076,7 +1076,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
TRACE ((")"));
out:
os_mutexUnlock (&wr->e.lock);
wr->whc->ops->free_deferred_free_list (wr->whc, deferred_free_list);
whc_free_deferred_free_list (wr->whc, deferred_free_list);
return 1;
}
@ -1498,7 +1498,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
/* Resend the requested fragments if we still have the sample, send
a Gap if we don't have them anymore. */
if (wr->whc->ops->borrow_sample (wr->whc, seq, &sample))
if (whc_borrow_sample (wr->whc, seq, &sample))
{
const unsigned base = msg->fragmentNumberState.bitmap_base - 1;
int enqueued = 1;
@ -1514,7 +1514,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
enqueued = qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0);
}
}
wr->whc->ops->return_sample (wr->whc, &sample, false);
whc_return_sample (wr->whc, &sample, false);
}
else
{
@ -1541,7 +1541,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
to give the reader a chance to nack the rest and make sure
hearbeats will go out at a reasonably high rate for a while */
struct whc_state whcst;
wr->whc->ops->get_state(wr->whc, &whcst);
whc_get_state(wr->whc, &whcst);
force_heartbeat_to_peer (wr, &whcst, prd, 1);
writer_hbcontrol_note_asyncwrite (wr, now_mt ());
}

View file

@ -884,7 +884,7 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist
if (!do_insert)
res = 0;
else if ((insres = wr->whc->ops->insert (wr->whc, writer_max_drop_seq (wr), seq, plist, serdata, tk)) < 0)
else if ((insres = whc_insert (wr->whc, writer_max_drop_seq (wr), seq, plist, serdata, tk)) < 0)
res = insres;
else
res = 1;
@ -893,7 +893,7 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist
if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER)
{
struct whc_state whcst;
wr->whc->ops->get_state(wr->whc, &whcst);
whc_get_state(wr->whc, &whcst);
if (WHCST_ISEMPTY(&whcst))
assert (wr->c.pp->builtins_deleted);
}
@ -947,7 +947,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr)
nn_mtime_t tnow = now_mt ();
const nn_mtime_t abstimeout = add_duration_to_mtime (tnow, nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time));
struct whc_state whcst;
wr->whc->ops->get_state(wr->whc, &whcst);
whc_get_state(wr->whc, &whcst);
{
#ifndef NDEBUG
@ -992,7 +992,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr)
thread_state_asleep (lookup_thread_state());
result = os_condTimedWait (&wr->throttle_cond, &wr->e.lock, &timeout);
thread_state_awake (lookup_thread_state());
wr->whc->ops->get_state(wr->whc, &whcst);
whc_get_state(wr->whc, &whcst);
}
if (result == os_resultTimeout)
{
@ -1064,7 +1064,7 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p
/* If WHC overfull, block. */
{
struct whc_state whcst;
wr->whc->ops->get_state(wr->whc, &whcst);
whc_get_state(wr->whc, &whcst);
if (whcst.unacked_bytes > wr->whc_high)
{
os_result ores;
@ -1119,7 +1119,7 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p
{
struct whc_state whcst;
if (wr->heartbeat_xevent)
wr->whc->ops->get_state(wr->whc, &whcst);
whc_get_state(wr->whc, &whcst);
/* Note the subtlety of enqueueing with the lock held but
transmitting without holding the lock. Still working on

28
src/core/ddsi/src/q_whc.c Normal file
View file

@ -0,0 +1,28 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include "os/os.h"
#include "ddsi/q_rtps.h"
#include "ddsi/q_time.h"
#include "ddsi/q_whc.h"
extern inline seqno_t whc_next_seq (const struct whc *whc, seqno_t seq);
extern inline void whc_get_state (const struct whc *whc, struct whc_state *st);
extern inline bool whc_borrow_sample (const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample);
extern inline bool whc_borrow_sample_key (const struct whc *whc, const struct ddsi_serdata *serdata_key, struct whc_borrowed_sample *sample);
extern inline void whc_return_sample (struct whc *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info);
extern inline void whc_sample_iter_init (const struct whc *whc, struct whc_sample_iter *it);
extern inline bool whc_sample_iter_borrow_next (struct whc_sample_iter *it, struct whc_borrowed_sample *sample);
extern inline void whc_free (struct whc *whc);
extern int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct tkmap_instance *tk);
extern unsigned whc_downgrade_to_volatile (struct whc *whc, struct whc_state *st);
extern unsigned whc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list);
extern void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list);

View file

@ -607,7 +607,7 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt
assert (wr->reliable);
os_mutexLock (&wr->e.lock);
wr->whc->ops->get_state(wr->whc, &whcst);
whc_get_state(wr->whc, &whcst);
if (!writer_must_have_hb_scheduled (wr, &whcst))
{
hbansreq = 1; /* just for trace */
@ -1023,7 +1023,7 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
nn_xmsg_free (mpayload);
os_mutexLock (&spdp_wr->e.lock);
if (spdp_wr->whc->ops->borrow_sample_key (spdp_wr->whc, sd, &sample))
if (whc_borrow_sample_key (spdp_wr->whc, sd, &sample))
{
/* Claiming it is new rather than a retransmit so that the rexmit
limiting won't kick in. It is best-effort and therefore the
@ -1031,7 +1031,7 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
place anyway. Nor is it necessary to fiddle with heartbeat
control stuff. */
enqueue_sample_wrlock_held (spdp_wr, sample.seq, sample.plist, sample.serdata, prd, 1);
spdp_wr->whc->ops->return_sample(spdp_wr->whc, &sample, false);
whc_return_sample(spdp_wr->whc, &sample, false);
#ifndef NDEBUG
sample_found = true;
#endif