Merge remote-tracking branch 'upstream/master' into security
This commit is contained in:
		
						commit
						54fad0d601
					
				
					 11 changed files with 214 additions and 37 deletions
				
			
		| 
						 | 
				
			
			@ -1311,7 +1311,7 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
 | 
			
		|||
        newn->idxnode = idxn;
 | 
			
		||||
        newn->idxnode_pos = idxn->headidx;
 | 
			
		||||
 | 
			
		||||
        if (oldn && (whc->wrinfo.hdepth > 0 || oldn->seq <= max_drop_seq) && whc->wrinfo.tldepth > 0)
 | 
			
		||||
        if (oldn && (whc->wrinfo.hdepth > 0 || oldn->seq <= max_drop_seq) && (!whc->wrinfo.is_transient_local || whc->wrinfo.tldepth > 0))
 | 
			
		||||
        {
 | 
			
		||||
          TRACE (" prune whcn %p", (void *)oldn);
 | 
			
		||||
          assert (oldn != whc->maxseq_node || whc->wrinfo.has_deadline);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -110,27 +110,41 @@ static dds_entity_t create_and_sync_reader(dds_entity_t subscriber, dds_entity_t
 | 
			
		|||
  return reader;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void check_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max)
 | 
			
		||||
static void get_writer_whc_state (dds_entity_t writer, struct whc_state *whcst)
 | 
			
		||||
{
 | 
			
		||||
  struct dds_entity *wr_entity;
 | 
			
		||||
  struct writer *wr;
 | 
			
		||||
  struct whc_state whcst;
 | 
			
		||||
  CU_ASSERT_EQUAL_FATAL(dds_entity_pin(writer, &wr_entity), 0);
 | 
			
		||||
  thread_state_awake(lookup_thread_state(), &wr_entity->m_domain->gv);
 | 
			
		||||
  wr = entidx_lookup_writer_guid(wr_entity->m_domain->gv.entity_index, &wr_entity->m_guid);
 | 
			
		||||
  CU_ASSERT_FATAL(wr != NULL);
 | 
			
		||||
  assert(wr != NULL); /* for Clang's static analyzer */
 | 
			
		||||
  whc_get_state(wr->whc, &whcst);
 | 
			
		||||
  whc_get_state(wr->whc, whcst);
 | 
			
		||||
  thread_state_asleep(lookup_thread_state());
 | 
			
		||||
  dds_entity_unpin(wr_entity);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void check_intermediate_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max)
 | 
			
		||||
{
 | 
			
		||||
  struct whc_state whcst;
 | 
			
		||||
  get_writer_whc_state (writer, &whcst);
 | 
			
		||||
  /* WHC must not contain any samples < exp_min and must contain at least exp_max if it
 | 
			
		||||
     contains at least one sample.  (We never know for certain when ACKs arrive.) */
 | 
			
		||||
  printf(" -- intermediate state: unacked: %zu; min %"PRId64" (exp %"PRId64"); max %"PRId64" (exp %"PRId64")\n", whcst.unacked_bytes, whcst.min_seq, exp_min, whcst.max_seq, exp_max);
 | 
			
		||||
  CU_ASSERT_FATAL (whcst.min_seq >= exp_min || (whcst.min_seq == -1 && whcst.max_seq == -1));
 | 
			
		||||
  CU_ASSERT_FATAL (whcst.max_seq == exp_max || (whcst.min_seq == -1 && whcst.max_seq == -1));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void check_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max)
 | 
			
		||||
{
 | 
			
		||||
  struct whc_state whcst;
 | 
			
		||||
  get_writer_whc_state (writer, &whcst);
 | 
			
		||||
  printf(" -- final state: unacked: %zu; min %"PRId64" (exp %"PRId64"); max %"PRId64" (exp %"PRId64")\n", whcst.unacked_bytes, whcst.min_seq, exp_min, whcst.max_seq, exp_max);
 | 
			
		||||
  CU_ASSERT_EQUAL_FATAL (whcst.unacked_bytes, 0);
 | 
			
		||||
  CU_ASSERT_EQUAL_FATAL (whcst.min_seq, exp_min);
 | 
			
		||||
  CU_ASSERT_EQUAL_FATAL (whcst.max_seq, exp_max);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#define V DDS_DURABILITY_VOLATILE
 | 
			
		||||
#define TL DDS_DURABILITY_TRANSIENT_LOCAL
 | 
			
		||||
#define R DDS_RELIABILITY_RELIABLE
 | 
			
		||||
| 
						 | 
				
			
			@ -191,6 +205,23 @@ static void test_whc_end_state(dds_durability_kind_t d, dds_reliability_kind_t r
 | 
			
		|||
      ret = dds_write (writer, &sample_keyless);
 | 
			
		||||
      CU_ASSERT_FATAL (ret == DDS_RETCODE_OK);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* if history is truly keep last, there may never be more data present than the max of the
 | 
			
		||||
       history depth(s) */
 | 
			
		||||
    if (r == R && h != KA && (d == V || dh != KA))
 | 
			
		||||
    {
 | 
			
		||||
      if (rrd || d != V)
 | 
			
		||||
      {
 | 
			
		||||
        int32_t depth = (d == V || hd >= dhd) ? hd : dhd;
 | 
			
		||||
        int32_t exp_max = ni * (s + 1);
 | 
			
		||||
        int32_t exp_min = exp_max - ni * (depth - 1) - (ni - 1);
 | 
			
		||||
        check_intermediate_whc_state (writer, exp_min, exp_max);
 | 
			
		||||
      }
 | 
			
		||||
      else
 | 
			
		||||
      {
 | 
			
		||||
        check_intermediate_whc_state (writer, -1, -1);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /* delete readers, wait until no matching reader */
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -13,6 +13,7 @@
 | 
			
		|||
#include <string.h>
 | 
			
		||||
 | 
			
		||||
#include "dds/ddsrt/endian.h"
 | 
			
		||||
#include "dds/ddsrt/heap.h"
 | 
			
		||||
#include "dds/ddsrt/log.h"
 | 
			
		||||
#include "dds/ddsrt/sockets.h"
 | 
			
		||||
#include "dds/ddsi/ddsi_ipaddr.h"
 | 
			
		||||
| 
						 | 
				
			
			@ -103,6 +104,7 @@ enum ddsi_locator_from_string_result ddsi_ipaddr_from_string (ddsi_tran_factory_
 | 
			
		|||
          return AFSR_UNKNOWN;
 | 
			
		||||
      }
 | 
			
		||||
      memcpy(&tmpaddr, &hent->addrs[0], sizeof(hent->addrs[0]));
 | 
			
		||||
      ddsrt_free (hent);
 | 
			
		||||
#else
 | 
			
		||||
      return AFSR_INVALID;
 | 
			
		||||
#endif
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -699,6 +699,7 @@ static bool ddsi_tcp_supports (const struct ddsi_tran_factory *fact_cmn, int32_t
 | 
			
		|||
 | 
			
		||||
static int ddsi_tcp_locator (struct ddsi_tran_factory *fact_cmn, ddsi_tran_base_t base, nn_locator_t *loc)
 | 
			
		||||
{
 | 
			
		||||
  loc->tran = fact_cmn;
 | 
			
		||||
  loc->kind = fact_cmn->m_kind;
 | 
			
		||||
  memcpy(loc->address, base->gv->extloc.address, sizeof(loc->address));
 | 
			
		||||
  loc->port = base->m_port;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -243,6 +243,7 @@ int ddsrt_atomic_cas64 (volatile ddsrt_atomic_uint64_t *x, uint64_t exp, uint64_
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
#define DDSRT_FAKE_ATOMIC64(name, oper, ret) \
 | 
			
		||||
  uint64_t ddsrt_atomic_##name##64_##ret (volatile ddsrt_atomic_uint64_t *x, uint64_t v); \
 | 
			
		||||
  uint64_t ddsrt_atomic_##name##64_##ret (volatile ddsrt_atomic_uint64_t *x, uint64_t v) \
 | 
			
		||||
  { \
 | 
			
		||||
    const uint64_t idx = atomic64_lock_index (x); \
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -167,14 +167,37 @@ guess_iftype (const PIP_ADAPTER_ADDRESSES iface)
 | 
			
		|||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int
 | 
			
		||||
static dds_return_t
 | 
			
		||||
copyname(const wchar_t *wstr, char **strp)
 | 
			
		||||
{
 | 
			
		||||
  int cnt, len;
 | 
			
		||||
  char buf[1], *str;
 | 
			
		||||
 | 
			
		||||
  len = WideCharToMultiByte(
 | 
			
		||||
    CP_UTF8, WC_ERR_INVALID_CHARS, wstr, -1, buf, 0, NULL, NULL);
 | 
			
		||||
  if (len == 0) {
 | 
			
		||||
    return DDS_RETCODE_BAD_PARAMETER;
 | 
			
		||||
  } else if ((str = ddsrt_malloc_s(len)) == NULL) {
 | 
			
		||||
    return DDS_RETCODE_OUT_OF_RESOURCES;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  cnt = WideCharToMultiByte(
 | 
			
		||||
    CP_UTF8, WC_ERR_INVALID_CHARS, wstr, -1, str, len, NULL, NULL);
 | 
			
		||||
  assert(cnt == len);
 | 
			
		||||
  assert(str[len - 1] == '\0');
 | 
			
		||||
 | 
			
		||||
  *strp = str;
 | 
			
		||||
  return DDS_RETCODE_OK;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static dds_return_t
 | 
			
		||||
copyaddr(
 | 
			
		||||
  ddsrt_ifaddrs_t **ifap,
 | 
			
		||||
  const PIP_ADAPTER_ADDRESSES iface,
 | 
			
		||||
  const PMIB_IPADDRTABLE addrtable,
 | 
			
		||||
  const PIP_ADAPTER_UNICAST_ADDRESS addr)
 | 
			
		||||
{
 | 
			
		||||
  dds_return_t err = DDS_RETCODE_OK;
 | 
			
		||||
  dds_return_t rc = DDS_RETCODE_OK;
 | 
			
		||||
  ddsrt_ifaddrs_t *ifa;
 | 
			
		||||
  struct sockaddr *sa;
 | 
			
		||||
  size_t sz;
 | 
			
		||||
| 
						 | 
				
			
			@ -187,15 +210,17 @@ copyaddr(
 | 
			
		|||
  sz = (size_t)addr->Address.iSockaddrLength;
 | 
			
		||||
 | 
			
		||||
  if ((ifa = ddsrt_calloc_s(1, sizeof(*ifa))) == NULL) {
 | 
			
		||||
    err = DDS_RETCODE_OUT_OF_RESOURCES;
 | 
			
		||||
    rc = DDS_RETCODE_OUT_OF_RESOURCES;
 | 
			
		||||
  } else {
 | 
			
		||||
    ifa->flags = getflags(iface);
 | 
			
		||||
    ifa->type = guess_iftype(iface);
 | 
			
		||||
    ifa->addr = ddsrt_memdup(sa, sz);
 | 
			
		||||
    (void)ddsrt_asprintf(&ifa->name, "%wS", iface->FriendlyName);
 | 
			
		||||
    if (ifa->addr == NULL || ifa->name == NULL) {
 | 
			
		||||
      err = DDS_RETCODE_OUT_OF_RESOURCES;
 | 
			
		||||
    } else if (ifa->addr->sa_family == AF_INET6) {
 | 
			
		||||
    if ((ifa->addr = ddsrt_memdup(sa, sz)) == NULL) {
 | 
			
		||||
      rc = DDS_RETCODE_OUT_OF_RESOURCES;
 | 
			
		||||
    } else {
 | 
			
		||||
      rc = copyname(iface->FriendlyName, &ifa->name);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (ifa->addr->sa_family == AF_INET6) {
 | 
			
		||||
      ifa->index = iface->Ipv6IfIndex;
 | 
			
		||||
 | 
			
		||||
      /* Address is not in addrtable if the interface is not connected. */
 | 
			
		||||
| 
						 | 
				
			
			@ -222,18 +247,18 @@ copyaddr(
 | 
			
		|||
      if ((ifa->netmask = ddsrt_memdup(&nm, sz)) == NULL ||
 | 
			
		||||
          (ifa->broadaddr = ddsrt_memdup(&bc, sz)) == NULL)
 | 
			
		||||
      {
 | 
			
		||||
        err = DDS_RETCODE_OUT_OF_RESOURCES;
 | 
			
		||||
        rc = DDS_RETCODE_OUT_OF_RESOURCES;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if (err == 0) {
 | 
			
		||||
  if (rc == DDS_RETCODE_OK) {
 | 
			
		||||
    *ifap = ifa;
 | 
			
		||||
  } else {
 | 
			
		||||
    ddsrt_freeifaddrs(ifa);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return err;
 | 
			
		||||
  return rc;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
dds_return_t
 | 
			
		||||
| 
						 | 
				
			
			@ -241,7 +266,7 @@ ddsrt_getifaddrs(
 | 
			
		|||
  ddsrt_ifaddrs_t **ifap,
 | 
			
		||||
  const int *afs)
 | 
			
		||||
{
 | 
			
		||||
  int err = 0;
 | 
			
		||||
  dds_return_t rc = DDS_RETCODE_OK;
 | 
			
		||||
  int use;
 | 
			
		||||
  PIP_ADAPTER_ADDRESSES ifaces = NULL, iface;
 | 
			
		||||
  PIP_ADAPTER_UNICAST_ADDRESS addr = NULL;
 | 
			
		||||
| 
						 | 
				
			
			@ -257,12 +282,15 @@ ddsrt_getifaddrs(
 | 
			
		|||
 | 
			
		||||
  ifa = ifa_root = ifa_next = NULL;
 | 
			
		||||
 | 
			
		||||
  if ((err = getifaces(&ifaces)) == DDS_RETCODE_OK &&
 | 
			
		||||
      (err = getaddrtable(&addrtable)) == DDS_RETCODE_OK)
 | 
			
		||||
  if ((rc = getifaces(&ifaces)) == DDS_RETCODE_OK &&
 | 
			
		||||
      (rc = getaddrtable(&addrtable)) == DDS_RETCODE_OK)
 | 
			
		||||
  {
 | 
			
		||||
    for (iface = ifaces; !err && iface != NULL; iface = iface->Next) {
 | 
			
		||||
    for (iface = ifaces;
 | 
			
		||||
         iface != NULL && rc == DDS_RETCODE_OK;
 | 
			
		||||
         iface = iface->Next)
 | 
			
		||||
    {
 | 
			
		||||
      for (addr = iface->FirstUnicastAddress;
 | 
			
		||||
           addr != NULL;
 | 
			
		||||
           addr != NULL && rc == DDS_RETCODE_OK;
 | 
			
		||||
           addr = addr->Next)
 | 
			
		||||
      {
 | 
			
		||||
        sa = (struct sockaddr *)addr->Address.lpSockaddr;
 | 
			
		||||
| 
						 | 
				
			
			@ -272,8 +300,8 @@ ddsrt_getifaddrs(
 | 
			
		|||
        }
 | 
			
		||||
 | 
			
		||||
        if (use) {
 | 
			
		||||
          err = copyaddr(&ifa_next, iface, addrtable, addr);
 | 
			
		||||
          if (err == DDS_RETCODE_OK) {
 | 
			
		||||
          rc = copyaddr(&ifa_next, iface, addrtable, addr);
 | 
			
		||||
          if (rc == DDS_RETCODE_OK) {
 | 
			
		||||
            if (ifa == NULL) {
 | 
			
		||||
              ifa = ifa_root = ifa_next;
 | 
			
		||||
            } else {
 | 
			
		||||
| 
						 | 
				
			
			@ -289,11 +317,11 @@ ddsrt_getifaddrs(
 | 
			
		|||
  ddsrt_free(ifaces);
 | 
			
		||||
  ddsrt_free(addrtable);
 | 
			
		||||
 | 
			
		||||
  if (err == DDS_RETCODE_OK) {
 | 
			
		||||
  if (rc == DDS_RETCODE_OK) {
 | 
			
		||||
    *ifap = ifa_root;
 | 
			
		||||
  } else {
 | 
			
		||||
    ddsrt_freeifaddrs(ifa_root);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return err;
 | 
			
		||||
  return rc;
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -184,6 +184,11 @@ bool record_cputime (struct record_cputime_state *state, const char *prefix, dds
 | 
			
		|||
  return print_cputime (&state->s, prefix, false, true);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
double record_cputime_read_rss (const struct record_cputime_state *state)
 | 
			
		||||
{
 | 
			
		||||
  return state->s.maxrss;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct record_cputime_state *record_cputime_new (dds_entity_t wr)
 | 
			
		||||
{
 | 
			
		||||
  ddsrt_thread_list_id_t tids[100];
 | 
			
		||||
| 
						 | 
				
			
			@ -251,6 +256,12 @@ bool record_cputime (struct record_cputime_state *state, const char *prefix, dds
 | 
			
		|||
  (void) tnow;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
double record_cputime_read_rss (const struct record_cputime_state *state)
 | 
			
		||||
{
 | 
			
		||||
  (void) state;
 | 
			
		||||
  return 0.0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct record_cputime_state *record_cputime_new (dds_entity_t wr)
 | 
			
		||||
{
 | 
			
		||||
  (void) wr;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,6 +19,7 @@ struct record_cputime_state;
 | 
			
		|||
struct record_cputime_state *record_cputime_new (dds_entity_t wr);
 | 
			
		||||
void record_cputime_free (struct record_cputime_state *state);
 | 
			
		||||
bool record_cputime (struct record_cputime_state *state, const char *prefix, dds_time_t tnow);
 | 
			
		||||
double record_cputime_read_rss (const struct record_cputime_state *state);
 | 
			
		||||
bool print_cputime (const struct CPUStats *s, const char *prefix, bool print_host, bool is_fresh);
 | 
			
		||||
 | 
			
		||||
#endif
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -151,6 +151,18 @@ static dds_duration_t ping_intv;
 | 
			
		|||
   pongs had been received */
 | 
			
		||||
static uint32_t ping_timeouts = 0;
 | 
			
		||||
 | 
			
		||||
/* Maximum allowed increase in RSS between 2nd RSS sample and
 | 
			
		||||
   final RSS sample: final one must be <=
 | 
			
		||||
   init * (1 + rss_factor/100) + rss_term  */
 | 
			
		||||
static bool rss_check = false;
 | 
			
		||||
static double rss_factor = 0;
 | 
			
		||||
static double rss_term = 0;
 | 
			
		||||
 | 
			
		||||
/* Minimum number of samples, minimum number of roundtrips to
 | 
			
		||||
   declare the run a success */
 | 
			
		||||
static uint64_t min_received = 0;
 | 
			
		||||
static uint64_t min_roundtrips = 0;
 | 
			
		||||
 | 
			
		||||
static ddsrt_mutex_t disc_lock;
 | 
			
		||||
 | 
			
		||||
/* Publisher statistics and lock protecting it */
 | 
			
		||||
| 
						 | 
				
			
			@ -207,6 +219,7 @@ struct subthread_arg_pongstat {
 | 
			
		|||
  uint64_t min, max;
 | 
			
		||||
  uint64_t sum;
 | 
			
		||||
  uint32_t cnt;
 | 
			
		||||
  uint64_t totcnt;
 | 
			
		||||
  uint64_t *raw;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -733,6 +746,7 @@ static bool update_roundtrip (dds_instance_handle_t pubhandle, uint64_t tdelta,
 | 
			
		|||
      if (x->cnt < PINGPONG_RAWSIZE)
 | 
			
		||||
        x->raw[x->cnt] = tdelta;
 | 
			
		||||
      x->cnt++;
 | 
			
		||||
      x->totcnt++;
 | 
			
		||||
      ddsrt_mutex_unlock (&pongstat_lock);
 | 
			
		||||
      return allseen;
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			@ -742,6 +756,7 @@ static bool update_roundtrip (dds_instance_handle_t pubhandle, uint64_t tdelta,
 | 
			
		|||
  x->pphandle = get_pphandle_for_pubhandle (pubhandle);
 | 
			
		||||
  x->min = x->max = x->sum = tdelta;
 | 
			
		||||
  x->cnt = 1;
 | 
			
		||||
  x->totcnt = 1;
 | 
			
		||||
  x->raw = malloc (PINGPONG_RAWSIZE * sizeof (*x->raw));
 | 
			
		||||
  x->raw[0] = tdelta;
 | 
			
		||||
  npongstat++;
 | 
			
		||||
| 
						 | 
				
			
			@ -1343,7 +1358,7 @@ static int cmp_uint64 (const void *va, const void *vb)
 | 
			
		|||
  return (*a == *b) ? 0 : (*a < *b) ? -1 : 1;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, struct record_cputime_state *cputime_state, struct record_netload_state *netload_state)
 | 
			
		||||
static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, struct record_cputime_state *cputime_state, struct record_netload_state *netload_state)
 | 
			
		||||
{
 | 
			
		||||
  char prefix[128];
 | 
			
		||||
  const double ts = (double) (tnow - tref) / 1e9;
 | 
			
		||||
| 
						 | 
				
			
			@ -1469,6 +1484,7 @@ static void print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, str
 | 
			
		|||
  if (output)
 | 
			
		||||
    record_netload (netload_state, prefix, tnow);
 | 
			
		||||
  fflush (stdout);
 | 
			
		||||
  return output;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void subthread_arg_init (struct subthread_arg *arg, dds_entity_t rd, uint32_t max_samples)
 | 
			
		||||
| 
						 | 
				
			
			@ -1555,8 +1571,14 @@ OPTIONS:\n\
 | 
			
		|||
  -d DEV:BW           report network load for device DEV with nominal\n\
 | 
			
		||||
                      bandwidth BW in bits/s (e.g., eth0:1e9)\n\
 | 
			
		||||
  -D DUR              run for at most DUR seconds\n\
 | 
			
		||||
  -N COUNT            require at least COUNT matching participants\n\
 | 
			
		||||
  -M DUR              require those participants to match within DUR seconds\n\
 | 
			
		||||
  -Q KEY:VAL          set success criteria\n\
 | 
			
		||||
                        rss:X%%       max allowed increase in RSS, in %%\n\
 | 
			
		||||
                        rss:X         max allowed increase in RSS, in MB\n\
 | 
			
		||||
                        samples:N     min received messages by \"sub\"\n\
 | 
			
		||||
                        roundtrips:N  min roundtrips for \"pong\"\n\
 | 
			
		||||
                        minmatch:N    require >= N matching participants\n\
 | 
			
		||||
                        maxwait:DUR   require those participants to match\n\
 | 
			
		||||
                                      within DUR seconds\n\
 | 
			
		||||
  -R TREF             timestamps in the output relative to TREF instead of\n\
 | 
			
		||||
                      process start\n\
 | 
			
		||||
  -i ID               use domain ID instead of the default domain\n\
 | 
			
		||||
| 
						 | 
				
			
			@ -1867,7 +1889,7 @@ int main (int argc, char *argv[])
 | 
			
		|||
 | 
			
		||||
  argv0 = argv[0];
 | 
			
		||||
 | 
			
		||||
  while ((opt = getopt (argc, argv, "cd:D:i:n:k:uLK:T:M:N:R:h")) != EOF)
 | 
			
		||||
  while ((opt = getopt (argc, argv, "cd:D:i:n:k:uLK:T:Q:R:h")) != EOF)
 | 
			
		||||
  {
 | 
			
		||||
    switch (opt)
 | 
			
		||||
    {
 | 
			
		||||
| 
						 | 
				
			
			@ -1897,11 +1919,28 @@ int main (int argc, char *argv[])
 | 
			
		|||
        else if (strcmp (optarg, "UK1024") == 0) topicsel = UK1024;
 | 
			
		||||
        else error3 ("%s: unknown topic\n", optarg);
 | 
			
		||||
        break;
 | 
			
		||||
      case 'M': maxwait = atof (optarg); if (maxwait <= 0) maxwait = HUGE_VAL; break;
 | 
			
		||||
      case 'N': minmatch = (unsigned) atoi (optarg); break;
 | 
			
		||||
      case 'Q': {
 | 
			
		||||
        int pos;
 | 
			
		||||
        double d;
 | 
			
		||||
        unsigned long n;
 | 
			
		||||
        if (sscanf (optarg, "rss:%lf%n", &d, &pos) == 1 && (optarg[pos] == 0 || optarg[pos] == '%')) {
 | 
			
		||||
          if (optarg[pos] == 0) rss_term = d * 1048576.0; else rss_factor = 1.0 + d / 100.0;
 | 
			
		||||
          rss_check = true;
 | 
			
		||||
        } else if (sscanf (optarg, "samples:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) {
 | 
			
		||||
          min_received = (uint64_t) n;
 | 
			
		||||
        } else if (sscanf (optarg, "roundtrips:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) {
 | 
			
		||||
          min_roundtrips = (uint64_t) n;
 | 
			
		||||
        } else if (sscanf (optarg, "maxwait:%lf%n", &maxwait, &pos) == 1 && optarg[pos] == 0) {
 | 
			
		||||
          maxwait = (maxwait <= 0) ? HUGE_VAL : maxwait;
 | 
			
		||||
        } else if (sscanf (optarg, "minmatch:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) {
 | 
			
		||||
          minmatch = (uint32_t) n;
 | 
			
		||||
        } else {
 | 
			
		||||
          error3 ("-Q%s: invalid success criterium\n", optarg);
 | 
			
		||||
        }
 | 
			
		||||
        break;
 | 
			
		||||
      }
 | 
			
		||||
      case 'R': tref = 0; sscanf (optarg, "%"SCNd64, &tref); break;
 | 
			
		||||
      case 'h': usage (); break;
 | 
			
		||||
      default: error3 ("-%c: unknown option\n", opt); break;
 | 
			
		||||
      case 'h': default: usage (); break;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -2006,11 +2045,17 @@ int main (int argc, char *argv[])
 | 
			
		|||
  /* participants reader must exist before the "publication matched" or "subscription matched"
 | 
			
		||||
     listener is invoked, or it won't be able to get the details (FIXME: even the DDS spec
 | 
			
		||||
     has convenience functions for that ...) */
 | 
			
		||||
  if ((rd_participants = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0)
 | 
			
		||||
    error2 ("dds_create_reader(participants) failed: %d\n", (int) rd_participants);
 | 
			
		||||
  /* set listener later: DATA_AVAILABLE still has the nasty habit of potentially triggering
 | 
			
		||||
     before the reader is accessible to the application via its handle */
 | 
			
		||||
  listener = dds_create_listener (NULL);
 | 
			
		||||
  dds_lset_data_available (listener, participant_data_listener);
 | 
			
		||||
  if ((rd_participants = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, listener)) < 0)
 | 
			
		||||
    error2 ("dds_create_reader(participants) failed: %d\n", (int) rd_participants);
 | 
			
		||||
  dds_set_listener (rd_participants, listener);
 | 
			
		||||
  dds_delete_listener (listener);
 | 
			
		||||
  /* then there is the matter of data arriving prior to setting the listener ... this state
 | 
			
		||||
     of affairs is undoubtedly a bug */
 | 
			
		||||
  participant_data_listener (rd_participants, NULL);
 | 
			
		||||
  if ((rd_subscriptions = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, NULL, NULL)) < 0)
 | 
			
		||||
    error2 ("dds_create_reader(subscriptions) failed: %d\n", (int) rd_subscriptions);
 | 
			
		||||
  if ((rd_publications = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL)) < 0)
 | 
			
		||||
| 
						 | 
				
			
			@ -2155,7 +2200,7 @@ int main (int argc, char *argv[])
 | 
			
		|||
  const bool pingpong_waitset = (ping_intv != DDS_NEVER && ignorelocal == DDS_IGNORELOCAL_NONE) || pingpongmode == SM_WAITSET;
 | 
			
		||||
  if (pingpong_waitset)
 | 
			
		||||
  {
 | 
			
		||||
    ddsrt_thread_create (&subpingtid, "ping", &attr, subpingthread_waitset, &subarg_pong);
 | 
			
		||||
    ddsrt_thread_create (&subpingtid, "ping", &attr, subpingthread_waitset, &subarg_ping);
 | 
			
		||||
    ddsrt_thread_create (&subpongtid, "pong", &attr, subpongthread_waitset, &subarg_pong);
 | 
			
		||||
  }
 | 
			
		||||
  else
 | 
			
		||||
| 
						 | 
				
			
			@ -2179,6 +2224,7 @@ int main (int argc, char *argv[])
 | 
			
		|||
  dds_time_t tnext = tstart + DDS_SECS (1);
 | 
			
		||||
  dds_time_t tlast = tstart;
 | 
			
		||||
  dds_time_t tnextping = (ping_intv == DDS_INFINITY) ? DDS_NEVER : (ping_intv == 0) ? tstart + DDS_SECS (1) : tstart + ping_intv;
 | 
			
		||||
  double rss_init = 0.0, rss_final = 0.0;
 | 
			
		||||
  while (!ddsrt_atomic_ld32 (&termflag) && tnow < tstop)
 | 
			
		||||
  {
 | 
			
		||||
    dds_time_t twakeup = DDS_NEVER;
 | 
			
		||||
| 
						 | 
				
			
			@ -2244,12 +2290,17 @@ int main (int argc, char *argv[])
 | 
			
		|||
    tnow = dds_time ();
 | 
			
		||||
    if (tnext <= tnow)
 | 
			
		||||
    {
 | 
			
		||||
      print_stats (tref, tnow, tlast, cputime_state, netload_state);
 | 
			
		||||
      bool output;
 | 
			
		||||
      output = print_stats (tref, tnow, tlast, cputime_state, netload_state);
 | 
			
		||||
      tlast = tnow;
 | 
			
		||||
      if (tnow > tnext + DDS_MSECS (500))
 | 
			
		||||
        tnext = tnow + DDS_SECS (1);
 | 
			
		||||
      else
 | 
			
		||||
        tnext += DDS_SECS (1);
 | 
			
		||||
 | 
			
		||||
      if (rss_init == 0.0 && matchcount >= minmatch && output)
 | 
			
		||||
        rss_init = record_cputime_read_rss (cputime_state);
 | 
			
		||||
      rss_final = record_cputime_read_rss (cputime_state);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* If a "real" ping doesn't result in the expected number of pongs within a reasonable
 | 
			
		||||
| 
						 | 
				
			
			@ -2317,8 +2368,13 @@ int main (int argc, char *argv[])
 | 
			
		|||
  dds_delete (rd_data);
 | 
			
		||||
 | 
			
		||||
  uint64_t nlost = 0;
 | 
			
		||||
  bool received_ok = true;
 | 
			
		||||
  for (uint32_t i = 0; i < eseq_admin.nph; i++)
 | 
			
		||||
  {
 | 
			
		||||
    nlost += eseq_admin.stats[i].nlost;
 | 
			
		||||
    if (eseq_admin.stats[i].nrecv < (uint64_t) min_received)
 | 
			
		||||
      received_ok = false;
 | 
			
		||||
  }
 | 
			
		||||
  fini_eseq_admin (&eseq_admin);
 | 
			
		||||
  subthread_arg_fini (&subarg_data);
 | 
			
		||||
  subthread_arg_fini (&subarg_ping);
 | 
			
		||||
| 
						 | 
				
			
			@ -2330,8 +2386,13 @@ int main (int argc, char *argv[])
 | 
			
		|||
  ddsrt_mutex_destroy (&pubstat_lock);
 | 
			
		||||
  hist_free (pubstat_hist);
 | 
			
		||||
  free (pongwr);
 | 
			
		||||
  bool roundtrips_ok = true;
 | 
			
		||||
  for (uint32_t i = 0; i < npongstat; i++)
 | 
			
		||||
  {
 | 
			
		||||
    if (pongstat[i].totcnt < min_roundtrips)
 | 
			
		||||
      roundtrips_ok = false;
 | 
			
		||||
    free (pongstat[i].raw);
 | 
			
		||||
  }
 | 
			
		||||
  free (pongstat);
 | 
			
		||||
 | 
			
		||||
  bool ok = true;
 | 
			
		||||
| 
						 | 
				
			
			@ -2360,5 +2421,20 @@ int main (int argc, char *argv[])
 | 
			
		|||
    printf ("[%"PRIdPID"] error: %"PRIu64" samples lost\n", ddsrt_getpid (), nlost);
 | 
			
		||||
    ok = false;
 | 
			
		||||
  }
 | 
			
		||||
  if (!roundtrips_ok)
 | 
			
		||||
  {
 | 
			
		||||
    printf ("[%"PRIdPID"] error: too few roundtrips for some peers\n", ddsrt_getpid ());
 | 
			
		||||
    ok = false;
 | 
			
		||||
  }
 | 
			
		||||
  if (!received_ok)
 | 
			
		||||
  {
 | 
			
		||||
    printf ("[%"PRIdPID"] error: too few samples received from some peers\n", ddsrt_getpid ());
 | 
			
		||||
    ok = false;
 | 
			
		||||
  }
 | 
			
		||||
  if (rss_check && rss_final >= rss_init * rss_factor + rss_term)
 | 
			
		||||
  {
 | 
			
		||||
    printf ("[%"PRIdPID"] error: RSS grew too much (%f -> %f)\n", ddsrt_getpid (), rss_init, rss_final);
 | 
			
		||||
    ok = false;
 | 
			
		||||
  }
 | 
			
		||||
  return ok ? 0 : 1;
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										23
									
								
								src/tools/ddsperf/sanity.bash
									
										
									
									
									
										Executable file
									
								
							
							
						
						
									
										23
									
								
								src/tools/ddsperf/sanity.bash
									
										
									
									
									
										Executable file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,23 @@
 | 
			
		|||
exitcode=0
 | 
			
		||||
# RSS/samples/roundtrip numbers are based on experimentation on Travis
 | 
			
		||||
bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:10% -Qrss:0.5 -Qsamples:300000 -Qroundtrips:3000 sub ping & ddsperf_pids=$!
 | 
			
		||||
bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:10% -Qrss:0.5 pub & ddsperf_pids="$ddsperf_pids $!"
 | 
			
		||||
sleep 11
 | 
			
		||||
for pid in $ddsperf_pids ; do
 | 
			
		||||
    if kill -0 $pid 2>/dev/null ; then
 | 
			
		||||
        echo "killing process $pid"
 | 
			
		||||
        kill -9 $pid
 | 
			
		||||
        exitcode=2
 | 
			
		||||
    fi
 | 
			
		||||
    wait $pid
 | 
			
		||||
    x=$?
 | 
			
		||||
    if [[ $x -gt $exitcode ]] ; then
 | 
			
		||||
        exitcode=$x
 | 
			
		||||
    fi
 | 
			
		||||
done
 | 
			
		||||
if [[ $exitcode -gt 0 ]] ; then
 | 
			
		||||
    echo "** FAILED **"
 | 
			
		||||
else
 | 
			
		||||
    echo "** OK **"
 | 
			
		||||
fi
 | 
			
		||||
exit $exitcode
 | 
			
		||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue