remove legacy configuration settings

These settings all constitute settings from the long history of the DDSI
stack predating Eclipse Cyclone DDS and can reasonably be presumed never
to have been used in Cyclone.  Their removal is therefore not expected
to break backwards compatibility (which would be anyway be limited to
Cyclone complaining about undefined settings at startup):

* Tracing/Timestamps[@absolute]: has always been ignored

* Tracing/Timestamps: has always been ignored

* General/EnableLoopback: ignored for quite some time, before that
  changing it from the default resulted in crashes.

* General/StartupModeDuration: it did what it advertised (retain data in
  the history caches of volatile writers as-if they were transient-local
  with a durability history setting of keep-last 1 for the first few
  seconds after startup of the DDSI stack) but had no purpose other than
  complicating things as the volatile readers ignored the data anyway.

* General/StartupModeCoversTransient: see previous -- and besides,
  transient data is not supported yet in Cyclone.

* Compatibility/RespondToRtiInitZeroAckWithInvalidHeartbeat: arguably a
  good setting given that DDSI < 2.3 explicitly requires that all
  HEARTBEAT messages sent by a writer advertise the existence of at least
  1 sample, but this has been fixed in DDSI 2.3.  As this requirement was
  never respected by most DDSI implementations, there is no point in
  retaining the setting, while it does remove a rather tricky problem
  immediately after writer startup involving the conjuring up of a
  sample that was annihilated immediately before it could have been
  observed.

  That conjuring up (as it turns out) can cause a malformed message to go
  out (one that is harmless in itself).  Fixing the generation of that
  malformed message while the entire point of the trick is moot in DDSI
  2.3 is a bit silly.

  Note that full DDSI 2.3 compliance needs a bit more work, so not
  bumping the DDSI protocol version number yet.

* Compatibility/AckNackNumbitsEmptySet: changing it from 0 breaks
  compatibility with (at least) RTI Connext, and its reason for
  existence disappers with a fix in DDSI 2.3.

* Internal/AggressiveKeepLastWhc: changing the setting from the default
  made no sense whatsoever in Cyclone -- it would only add flow-control
  and potentially block a keep-last writer where the spec forbids that.

* Internal/LegacyFragmentation: a left-over from almost a decade ago when
  it was discovered that the specification was inconsistent in the use
  of the message header flags for fragmented data, and this stack for a
  while used the non-common interpretation.  There is no reasonable way of
  making the two modes compatible, and this setting merely existed to
  deal with the compatibility issue with some ancient OpenSplice DDS
  version.

* Durability/Encoding: historical junk.

* WatchDog and Lease: never had any function in Cyclone.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-05-04 13:20:30 +08:00 committed by eboasson
parent a7c7ac54c3
commit b5251d0390
13 changed files with 1771 additions and 2140 deletions

View file

@ -313,38 +313,13 @@ dds_writer_qos_set(
static struct whc *make_whc(const dds_qos_t *qos) static struct whc *make_whc(const dds_qos_t *qos)
{ {
bool startup_mode;
bool handle_as_transient_local; bool handle_as_transient_local;
unsigned hdepth, tldepth; unsigned hdepth, tldepth;
/* Startup mode causes the writer to treat data in its WHC as if
transient-local, for the first few seconds after startup of the
DDSI service. It is done for volatile reliable writers only
(which automatically excludes all builtin writers) or for all
writers except volatile best-effort & transient-local ones.
Which one to use depends on whether merge policies are in effect
in durability. If yes, then durability will take care of all
transient & persistent data; if no, DDSI discovery usually takes
too long and this'll save you.
Note: may still be cleared, if it turns out we are not maintaining
an index at all (e.g., volatile KEEP_ALL) */
if (config.startup_mode_full) {
startup_mode = gv.startup_mode &&
(qos->durability.kind >= NN_TRANSIENT_DURABILITY_QOS ||
(qos->durability.kind == NN_VOLATILE_DURABILITY_QOS &&
qos->reliability.kind != NN_BEST_EFFORT_RELIABILITY_QOS));
} else {
startup_mode = gv.startup_mode &&
(qos->durability.kind == NN_VOLATILE_DURABILITY_QOS &&
qos->reliability.kind != NN_BEST_EFFORT_RELIABILITY_QOS);
}
/* Construct WHC -- if aggressive_keep_last1 is set, the WHC will /* Construct WHC -- if aggressive_keep_last1 is set, the WHC will
drop all samples for which a later update is available. This drop all samples for which a later update is available. This
forces it to maintain a tlidx. */ forces it to maintain a tlidx. */
handle_as_transient_local = (qos->durability.kind == NN_TRANSIENT_LOCAL_DURABILITY_QOS); handle_as_transient_local = (qos->durability.kind == NN_TRANSIENT_LOCAL_DURABILITY_QOS);
if (!config.aggressive_keep_last_whc || qos->history.kind == NN_KEEP_ALL_HISTORY_QOS) if (qos->history.kind == NN_KEEP_ALL_HISTORY_QOS)
hdepth = 0; hdepth = 0;
else else
hdepth = (unsigned)qos->history.depth; hdepth = (unsigned)qos->history.depth;
@ -353,8 +328,6 @@ static struct whc *make_whc(const dds_qos_t *qos)
tldepth = 0; tldepth = 0;
else else
tldepth = (unsigned)qos->durability_service.history.depth; tldepth = (unsigned)qos->durability_service.history.depth;
} else if (startup_mode) {
tldepth = (hdepth == 0) ? 1 : hdepth;
} else { } else {
tldepth = 0; tldepth = 0;
} }

View file

@ -237,7 +237,6 @@ struct config
FILE *tracingOutputFile; FILE *tracingOutputFile;
char *tracingOutputFileName; char *tracingOutputFileName;
int tracingTimestamps; int tracingTimestamps;
int tracingRelativeTimestamps;
int tracingAppendToFile; int tracingAppendToFile;
unsigned allowMulticast; unsigned allowMulticast;
enum transport_selector transport_selector; enum transport_selector transport_selector;
@ -254,7 +253,6 @@ struct config
char *assumeMulticastCapable; char *assumeMulticastCapable;
int64_t spdp_interval; int64_t spdp_interval;
int64_t spdp_response_delay_max; int64_t spdp_response_delay_max;
int64_t startup_mode_duration;
int64_t lease_duration; int64_t lease_duration;
int64_t const_hb_intv_sched; int64_t const_hb_intv_sched;
int64_t const_hb_intv_sched_min; int64_t const_hb_intv_sched_min;
@ -263,8 +261,6 @@ struct config
enum retransmit_merging retransmit_merging; enum retransmit_merging retransmit_merging;
int64_t retransmit_merging_period; int64_t retransmit_merging_period;
int squash_participants; int squash_participants;
int startup_mode_full;
int forward_all_messages;
int liveliness_monitoring; int liveliness_monitoring;
int noprogress_log_stacktraces; int noprogress_log_stacktraces;
int64_t liveliness_monitoring_interval; int64_t liveliness_monitoring_interval;
@ -278,10 +274,6 @@ struct config
unsigned delivery_queue_maxsamples; unsigned delivery_queue_maxsamples;
int enableLoopback;
enum durability_cdr durability_cdr;
int buggy_datafrag_flags_mode;
int do_topic_discovery; int do_topic_discovery;
uint32_t max_msg_size; uint32_t max_msg_size;
@ -298,9 +290,7 @@ struct config
int tcp_use_peeraddr_for_unicast; int tcp_use_peeraddr_for_unicast;
#ifdef DDSI_INCLUDE_SSL #ifdef DDSI_INCLUDE_SSL
/* SSL support for TCP */ /* SSL support for TCP */
int ssl_enable; int ssl_enable;
int ssl_verify; int ssl_verify;
int ssl_verify_client; int ssl_verify_client;
@ -310,17 +300,13 @@ struct config
char * ssl_key_pass; char * ssl_key_pass;
char * ssl_ciphers; char * ssl_ciphers;
struct ssl_min_version ssl_min_version; struct ssl_min_version ssl_min_version;
#endif #endif
/* Thread pool configuration */ /* Thread pool configuration */
int tp_enable; int tp_enable;
uint32_t tp_threads; uint32_t tp_threads;
uint32_t tp_max_threads; uint32_t tp_max_threads;
int advertise_builtin_topic_writers;
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS #ifdef DDSI_INCLUDE_NETWORK_CHANNELS
struct config_channel_listelem *channels; struct config_channel_listelem *channels;
struct config_channel_listelem *max_channel; /* channel with highest prio; always computed */ struct config_channel_listelem *max_channel; /* channel with highest prio; always computed */
@ -343,7 +329,6 @@ struct config
uint32_t rmsg_chunk_size; /**<< size of a chunk in the receive buffer */ uint32_t rmsg_chunk_size; /**<< size of a chunk in the receive buffer */
uint32_t rbuf_size; /* << size of a single receiver buffer */ uint32_t rbuf_size; /* << size of a single receiver buffer */
enum besmode besmode; enum besmode besmode;
int aggressive_keep_last_whc;
int conservative_builtin_reader_startup; int conservative_builtin_reader_startup;
int meas_hb_to_ack_latency; int meas_hb_to_ack_latency;
int suppress_spdp_multicast; int suppress_spdp_multicast;
@ -389,8 +374,6 @@ struct config
int explicitly_publish_qos_set_to_default; int explicitly_publish_qos_set_to_default;
enum many_sockets_mode many_sockets_mode; enum many_sockets_mode many_sockets_mode;
int arrival_of_data_asserts_pp_and_ep_liveliness; int arrival_of_data_asserts_pp_and_ep_liveliness;
int acknack_numbits_emptyset;
int respond_to_rti_init_zero_ack_with_invalid_heartbeat;
int assume_rti_has_pmd_endpoints; int assume_rti_has_pmd_endpoints;
int port_dg; int port_dg;
@ -406,14 +389,9 @@ struct config
int initial_deaf; int initial_deaf;
int initial_mute; int initial_mute;
int64_t initial_deaf_mute_reset; int64_t initial_deaf_mute_reset;
int use_multicast_if_mreqn; int use_multicast_if_mreqn;
struct prune_deleted_ppant prune_deleted_ppant; struct prune_deleted_ppant prune_deleted_ppant;
/* not used by ddsi2, only validated; user layer directly accesses
the configuration tree */
ddsrt_sched_t watchdog_sched_class;
int32_t watchdog_sched_priority;
q__schedPrioClass watchdog_sched_priority_class;
}; };
struct ddsi_plugin struct ddsi_plugin

View file

@ -233,8 +233,6 @@ struct writer
enum writer_state state; enum writer_state state;
unsigned reliable: 1; /* iff 1, writer is reliable <=> heartbeat_xevent != NULL */ unsigned reliable: 1; /* iff 1, writer is reliable <=> heartbeat_xevent != NULL */
unsigned handle_as_transient_local: 1; /* controls whether data is retained in WHC */ unsigned handle_as_transient_local: 1; /* controls whether data is retained in WHC */
unsigned aggressive_keep_last: 1; /* controls whether KEEP_LAST will overwrite samples that haven't been ACK'd yet */
unsigned startup_mode: 1; /* causes data to be treated as T-L for a while */
unsigned include_keyhash: 1; /* iff 1, this writer includes a keyhash; keyless topics => include_keyhash = 0 */ unsigned include_keyhash: 1; /* iff 1, this writer includes a keyhash; keyless topics => include_keyhash = 0 */
unsigned retransmitting: 1; /* iff 1, this writer is currently retransmitting */ unsigned retransmitting: 1; /* iff 1, this writer is currently retransmitting */
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM
@ -570,7 +568,6 @@ void update_proxy_writer (struct proxy_writer * pwr, struct addrset *as);
int new_proxy_group (const struct nn_guid *guid, const char *name, const struct nn_xqos *xqos, nn_wctime_t timestamp); int new_proxy_group (const struct nn_guid *guid, const char *name, const struct nn_xqos *xqos, nn_wctime_t timestamp);
void delete_proxy_group (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit); void delete_proxy_group (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit);
void writer_exit_startup_mode (struct writer *wr);
uint64_t writer_instance_id (const struct nn_guid *guid); uint64_t writer_instance_id (const struct nn_guid *guid);
/* Call this to empty all address sets of all writers to stop all outgoing traffic, or to /* Call this to empty all address sets of all writers to stop all outgoing traffic, or to

View file

@ -223,13 +223,6 @@ struct q_globals {
/* Flag cleared when stopping (receive threads). FIXME. */ /* Flag cleared when stopping (receive threads). FIXME. */
int rtps_keepgoing; int rtps_keepgoing;
/* Startup mode causes data to be treated as transient-local with
depth 1 (i.e., stored in the WHCs and regurgitated on request) to
cover the start-up delay of the discovery protocols. Because all
discovery data is shared, this is strictly a start-up issue of the
service. */
int startup_mode;
/* Start time of the DDSI2 service, for logging relative time stamps, /* Start time of the DDSI2 service, for logging relative time stamps,
should I ever so desire. */ should I ever so desire. */
nn_wctime_t tstart; nn_wctime_t tstart;

View file

@ -29,7 +29,7 @@ inline nn_sequence_number_t toSN (seqno_t n) {
return x; return x;
} }
unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr, int datafrag_as_data); unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr);
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
int WildcardOverlap(char * p1, char * p2); int WildcardOverlap(char * p1, char * p2);

View file

@ -61,7 +61,6 @@ DDS_EXPORT struct xevent *qxev_heartbeat (struct xeventq *evq, nn_mtime_t tsched
DDS_EXPORT struct xevent *qxev_acknack (struct xeventq *evq, nn_mtime_t tsched, const nn_guid_t *pwr_guid, const nn_guid_t *rd_guid); DDS_EXPORT struct xevent *qxev_acknack (struct xeventq *evq, nn_mtime_t tsched, const nn_guid_t *pwr_guid, const nn_guid_t *rd_guid);
DDS_EXPORT struct xevent *qxev_spdp (nn_mtime_t tsched, const nn_guid_t *pp_guid, const nn_guid_t *proxypp_guid); DDS_EXPORT struct xevent *qxev_spdp (nn_mtime_t tsched, const nn_guid_t *pp_guid, const nn_guid_t *proxypp_guid);
DDS_EXPORT struct xevent *qxev_pmd_update (nn_mtime_t tsched, const nn_guid_t *pp_guid); DDS_EXPORT struct xevent *qxev_pmd_update (nn_mtime_t tsched, const nn_guid_t *pp_guid);
DDS_EXPORT struct xevent *qxev_end_startup_mode (nn_mtime_t tsched);
DDS_EXPORT struct xevent *qxev_delete_writer (nn_mtime_t tsched, const nn_guid_t *guid); DDS_EXPORT struct xevent *qxev_delete_writer (nn_mtime_t tsched, const nn_guid_t *guid);
/* cb will be called with now = T_NEVER if the event is still enqueued when when xeventq_free starts cleaning up */ /* cb will be called with now = T_NEVER if the event is still enqueued when when xeventq_free starts cleaning up */

File diff suppressed because it is too large Load diff

View file

@ -1719,7 +1719,7 @@ int builtins_dqueue_handler (const struct nn_rsample_info *sampleinfo, const str
from the submsg to always conform to that of the "Data" from the submsg to always conform to that of the "Data"
submessage regardless of the input. */ submessage regardless of the input. */
msg = (Data_DataFrag_common_t *) NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_SUBMSG_OFF (fragchain)); msg = (Data_DataFrag_common_t *) NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_SUBMSG_OFF (fragchain));
data_smhdr_flags = normalize_data_datafrag_flags (&msg->smhdr, config.buggy_datafrag_flags_mode); data_smhdr_flags = normalize_data_datafrag_flags (&msg->smhdr);
srcguid.prefix = sampleinfo->rst->src_guid_prefix; srcguid.prefix = sampleinfo->rst->src_guid_prefix;
srcguid.entityid = msg->writerId; srcguid.entityid = msg->writerId;

View file

@ -2657,39 +2657,11 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
{ {
assert (wr->xqos->history.kind == NN_KEEP_LAST_HISTORY_QOS); assert (wr->xqos->history.kind == NN_KEEP_LAST_HISTORY_QOS);
assert (wr->xqos->durability.kind == NN_TRANSIENT_LOCAL_DURABILITY_QOS); assert (wr->xqos->durability.kind == NN_TRANSIENT_LOCAL_DURABILITY_QOS);
wr->aggressive_keep_last = 1;
}
else
{
wr->aggressive_keep_last = (config.aggressive_keep_last_whc && wr->xqos->history.kind == NN_KEEP_LAST_HISTORY_QOS);
} }
wr->handle_as_transient_local = (wr->xqos->durability.kind == NN_TRANSIENT_LOCAL_DURABILITY_QOS); wr->handle_as_transient_local = (wr->xqos->durability.kind == NN_TRANSIENT_LOCAL_DURABILITY_QOS);
wr->include_keyhash = wr->include_keyhash =
config.generate_keyhash && config.generate_keyhash &&
((wr->e.guid.entityid.u & NN_ENTITYID_KIND_MASK) == NN_ENTITYID_KIND_WRITER_WITH_KEY); ((wr->e.guid.entityid.u & NN_ENTITYID_KIND_MASK) == NN_ENTITYID_KIND_WRITER_WITH_KEY);
/* Startup mode causes the writer to treat data in its WHC as if
transient-local, for the first few seconds after startup of the
DDSI service. It is done for volatile reliable writers only
(which automatically excludes all builtin writers) or for all
writers except volatile best-effort & transient-local ones.
Which one to use depends on whether merge policies are in effect
in durability. If yes, then durability will take care of all
transient & persistent data; if no, DDSI discovery usually takes
too long and this'll save you.
Note: may still be cleared, if it turns out we are not maintaining
an index at all (e.g., volatile KEEP_ALL) */
if (config.startup_mode_full) {
wr->startup_mode = gv.startup_mode &&
(wr->xqos->durability.kind >= NN_TRANSIENT_DURABILITY_QOS ||
(wr->xqos->durability.kind == NN_VOLATILE_DURABILITY_QOS &&
wr->xqos->reliability.kind != NN_BEST_EFFORT_RELIABILITY_QOS));
} else {
wr->startup_mode = gv.startup_mode &&
(wr->xqos->durability.kind == NN_VOLATILE_DURABILITY_QOS &&
wr->xqos->reliability.kind != NN_BEST_EFFORT_RELIABILITY_QOS);
}
wr->topic = ddsi_sertopic_ref (topic); wr->topic = ddsi_sertopic_ref (topic);
wr->as = new_addrset (); wr->as = new_addrset ();
wr->as_group = NULL; wr->as_group = NULL;
@ -2784,7 +2756,7 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
wr->lease_duration = T_NEVER; /* FIXME */ wr->lease_duration = T_NEVER; /* FIXME */
wr->whc = whc; wr->whc = whc;
if (wr->xqos->history.kind == NN_KEEP_LAST_HISTORY_QOS && wr->aggressive_keep_last) if (wr->xqos->history.kind == NN_KEEP_LAST_HISTORY_QOS)
{ {
/* hdepth > 0 => "aggressive keep last", and in that case: why /* hdepth > 0 => "aggressive keep last", and in that case: why
bother blocking for a slow receiver when the entire point of bother blocking for a slow receiver when the entire point of
@ -3066,24 +3038,6 @@ int delete_writer (const struct nn_guid *guid)
return 0; return 0;
} }
void writer_exit_startup_mode (struct writer *wr)
{
struct whc_node *deferred_free_list = NULL;
ddsrt_mutex_lock (&wr->e.lock);
if (wr->startup_mode)
{
unsigned cnt = 0;
struct whc_state whcst;
wr->startup_mode = 0;
cnt += remove_acked_messages (wr, &whcst, &deferred_free_list);
cnt += whc_downgrade_to_volatile (wr->whc, &whcst);
writer_clear_retransmitting (wr);
DDS_LOG(DDS_LC_DISCOVERY, " "PGUIDFMT": dropped %u samples\n", PGUID(wr->e.guid), cnt);
}
ddsrt_mutex_unlock (&wr->e.lock);
whc_free_deferred_free_list (wr->whc, deferred_free_list);
}
uint64_t writer_instance_id (const struct nn_guid *guid) uint64_t writer_instance_id (const struct nn_guid *guid)
{ {
struct entity_common *e; struct entity_common *e;

View file

@ -501,12 +501,6 @@ int rtps_config_prep (struct cfgst *cfgst)
if (config.max_participants == 0) if (config.max_participants == 0)
config.max_participants = 100; config.max_participants = 100;
} }
if (NN_STRICT_P)
{
/* Should not be sending invalid messages when strict */
config.respond_to_rti_init_zero_ack_with_invalid_heartbeat = 0;
config.acknack_numbits_emptyset = 1;
}
if (config.max_queued_rexmit_bytes == 0) if (config.max_queued_rexmit_bytes == 0)
{ {
#ifdef DDSI_INCLUDE_BANDWIDTH_LIMITING #ifdef DDSI_INCLUDE_BANDWIDTH_LIMITING
@ -1002,9 +996,6 @@ int rtps_init (void)
} }
#endif #endif
gv.startup_mode = (config.startup_mode_duration > 0) ? 1 : 0;
DDS_LOG(DDS_LC_CONFIG, "startup-mode: %s\n", gv.startup_mode ? "enabled" : "disabled");
(ddsi_plugin.init_fn) (); (ddsi_plugin.init_fn) ();
gv.xmsgpool = nn_xmsgpool_new (); gv.xmsgpool = nn_xmsgpool_new ();
@ -1401,10 +1392,6 @@ int rtps_start (void)
{ {
create_thread (&gv.listen_ts, "listen", (uint32_t (*) (void *)) listen_thread, gv.listener); create_thread (&gv.listen_ts, "listen", (uint32_t (*) (void *)) listen_thread, gv.listener);
} }
if (gv.startup_mode)
{
qxev_end_startup_mode (add_duration_to_mtime (now_mt (), config.startup_mode_duration));
}
if (config.monitor_port >= 0) if (config.monitor_port >= 0)
{ {
gv.debmon = new_debug_monitor (config.monitor_port); gv.debmon = new_debug_monitor (config.monitor_port);

View file

@ -438,7 +438,6 @@ static int valid_Data (const struct receiver_state *rst, struct nn_rmsg *rmsg, D
static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rmsg, DataFrag_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp) static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rmsg, DataFrag_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp)
{ {
/* on success: sampleinfo->{rst,statusinfo,pt_wr_info_zoff,bswap,complex_qos} all set */ /* on success: sampleinfo->{rst,statusinfo,pt_wr_info_zoff,bswap,complex_qos} all set */
const int interpret_smhdr_flags_asif_data = config.buggy_datafrag_flags_mode;
uint32_t payloadsz; uint32_t payloadsz;
nn_guid_t pwr_guid; nn_guid_t pwr_guid;
unsigned char *ptr; unsigned char *ptr;
@ -446,14 +445,6 @@ static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rms
if (size < sizeof (*msg)) if (size < sizeof (*msg))
return 0; /* too small even for fixed fields */ return 0; /* too small even for fixed fields */
if (interpret_smhdr_flags_asif_data)
{
/* D=1 && K=1 is invalid in this version of the protocol */
if ((msg->x.smhdr.flags & (DATA_FLAG_DATAFLAG | DATA_FLAG_KEYFLAG)) ==
(DATA_FLAG_DATAFLAG | DATA_FLAG_KEYFLAG))
return 0;
}
if (byteswap) if (byteswap)
{ {
msg->x.extraFlags = bswap2u (msg->x.extraFlags); msg->x.extraFlags = bswap2u (msg->x.extraFlags);
@ -497,13 +488,6 @@ static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rms
if (sampleinfo->seq <= 0 && sampleinfo->seq != NN_SEQUENCE_NUMBER_UNKNOWN) if (sampleinfo->seq <= 0 && sampleinfo->seq != NN_SEQUENCE_NUMBER_UNKNOWN)
return 0; return 0;
if (interpret_smhdr_flags_asif_data)
{
if ((msg->x.smhdr.flags & (DATA_FLAG_DATAFLAG | DATA_FLAG_KEYFLAG)) == 0)
/* may not fragment if not needed => surely _some_ payload must be present! */
return 0;
}
/* QoS and/or payload, so octetsToInlineQos must be within the msg; /* QoS and/or payload, so octetsToInlineQos must be within the msg;
since the serialized data and serialized parameter lists have a 4 since the serialized data and serialized parameter lists have a 4
byte header, that one, too must fit */ byte header, that one, too must fit */
@ -607,36 +591,10 @@ static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *
return; return;
} }
if (WHCST_ISEMPTY(whcst) && !config.respond_to_rti_init_zero_ack_with_invalid_heartbeat) /* Send a Heartbeat just to this peer */
{ add_Heartbeat (m, wr, whcst, hbansreq, prd->e.guid.entityid, 0);
/* If WHC is empty, we send a Gap combined with a Heartbeat. The DDS_TRACE("force_heartbeat_to_peer: "PGUIDFMT" -> "PGUIDFMT" - queue for transmit\n",
Gap reuses the latest sequence number (or consumes a new one if
the writer hasn't sent anything yet), therefore for the reader
it is as-if a Data submessage had once been sent with that
sequence number and it now receives an unsollicited response to
a NACK ... */
uint32_t bits = 0;
seqno_t seq;
if (wr->seq > 0)
seq = wr->seq;
else
{
/* never sent anything, pretend we did */
seq = wr->seq = 1;
UPDATE_SEQ_XMIT_LOCKED(wr, 1);
}
add_Gap (m, wr, prd, seq, seq+1, 1, &bits);
add_Heartbeat (m, wr, whcst, hbansreq, prd->e.guid.entityid, 1);
DDS_TRACE("force_heartbeat_to_peer: "PGUIDFMT" -> "PGUIDFMT" - whc empty, queueing gap #%"PRId64" + heartbeat for transmit\n",
PGUID (wr->e.guid), PGUID (prd->e.guid), seq);
}
else
{
/* Send a Heartbeat just to this peer */
add_Heartbeat (m, wr, whcst, hbansreq, prd->e.guid.entityid, 0);
DDS_TRACE("force_heartbeat_to_peer: "PGUIDFMT" -> "PGUIDFMT" - queue for transmit\n",
PGUID (wr->e.guid), PGUID (prd->e.guid)); PGUID (wr->e.guid), PGUID (prd->e.guid));
}
qxev_msg (wr->evq, m); qxev_msg (wr->evq, m);
} }
@ -1892,16 +1850,13 @@ static struct ddsi_serdata *extract_sample_from_data
return sample; return sample;
} }
unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr, int datafrag_as_data) unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr)
{ {
switch ((SubmessageKind_t) smhdr->submessageId) switch ((SubmessageKind_t) smhdr->submessageId)
{ {
case SMID_DATA: case SMID_DATA:
return smhdr->flags; return smhdr->flags;
case SMID_DATA_FRAG: case SMID_DATA_FRAG:
if (datafrag_as_data)
return smhdr->flags;
else
{ {
unsigned char common = smhdr->flags & DATA_FLAG_INLINE_QOS; unsigned char common = smhdr->flags & DATA_FLAG_INLINE_QOS;
Q_STATIC_ASSERT_CODE (DATA_FLAG_INLINE_QOS == DATAFRAG_FLAG_INLINE_QOS); Q_STATIC_ASSERT_CODE (DATA_FLAG_INLINE_QOS == DATAFRAG_FLAG_INLINE_QOS);
@ -1955,7 +1910,7 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
DataFrag header, so for the fixed-position things that we're DataFrag header, so for the fixed-position things that we're
interested in here, both can be treated as Data submessages. */ interested in here, both can be treated as Data submessages. */
msg = (Data_DataFrag_common_t *) NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_SUBMSG_OFF (fragchain)); msg = (Data_DataFrag_common_t *) NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_SUBMSG_OFF (fragchain));
data_smhdr_flags = normalize_data_datafrag_flags (&msg->smhdr, config.buggy_datafrag_flags_mode); data_smhdr_flags = normalize_data_datafrag_flags (&msg->smhdr);
/* Extract QoS's to the extent necessary. The expected case has all /* Extract QoS's to the extent necessary. The expected case has all
we need predecoded into a few bits in the sample info. we need predecoded into a few bits in the sample info.

View file

@ -352,13 +352,8 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
We're not really allowed to generate heartbeats when the WHC is We're not really allowed to generate heartbeats when the WHC is
empty, but it appears RTI sort-of needs them ... Now we use empty, but it appears RTI sort-of needs them ... Now we use
GAPs, and allocate a sequence number specially for that. */ GAPs, and allocate a sequence number specially for that. */
assert (config.respond_to_rti_init_zero_ack_with_invalid_heartbeat || wr->seq >= 1);
max = wr->seq; max = wr->seq;
min = max; min = max + 1;
if (config.respond_to_rti_init_zero_ack_with_invalid_heartbeat)
{
min += 1;
}
} }
else else
{ {
@ -378,14 +373,9 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
if (seq_xmit >= min) { if (seq_xmit >= min) {
/* Advertise some but not all data */ /* Advertise some but not all data */
max = seq_xmit; max = seq_xmit;
} else if (config.respond_to_rti_init_zero_ack_with_invalid_heartbeat) { } else {
/* if we can generate an empty heartbeat => do so. */ /* if we can generate an empty heartbeat => do so. */
max = min - 1; max = min - 1;
} else {
/* claim the existence of a sample we possibly haven't set
yet, at worst this causes a retransmission (but the
NackDelay usually takes care of that). */
max = min;
} }
} }
} }
@ -484,10 +474,7 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli
Note: fragnum is 0-based here, 1-based in DDSI. But 0-based is Note: fragnum is 0-based here, 1-based in DDSI. But 0-based is
much easier ... much easier ...
Expected inline QoS size: header(4) + statusinfo(8) + keyhash(20) actual expected_inline_qos_size is typically 0, but always claiming 32 bytes won't make
+ sentinel(4). Plus some spare cos I can't be bothered. */
const int set_smhdr_flags_asif_data = config.buggy_datafrag_flags_mode;
/* actual expected_inline_qos_size is typically 0, but always claiming 32 bytes won't make
a difference, so no point in being precise */ a difference, so no point in being precise */
const size_t expected_inline_qos_size = /* statusinfo */ 8 + /* keyhash */ 20 + /* sentinel */ 4; const size_t expected_inline_qos_size = /* statusinfo */ 8 + /* keyhash */ 20 + /* sentinel */ 4;
struct nn_xmsg_marker sm_marker; struct nn_xmsg_marker sm_marker;
@ -569,10 +556,7 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli
} }
else else
{ {
const unsigned char contentflag = const unsigned char contentflag = (serdata->kind == SDK_KEY ? DATAFRAG_FLAG_KEYFLAG : 0);
set_smhdr_flags_asif_data
? (serdata->kind == SDK_KEY ? DATA_FLAG_KEYFLAG : DATA_FLAG_DATAFLAG)
: (serdata->kind == SDK_KEY ? DATAFRAG_FLAG_KEYFLAG : 0);
DataFrag_t *frag = sm; DataFrag_t *frag = sm;
/* empty means size = 0, which means it never needs fragmenting */ /* empty means size = 0, which means it never needs fragmenting */
assert (serdata->kind != SDK_EMPTY); assert (serdata->kind != SDK_EMPTY);
@ -887,7 +871,7 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist
if (wr->reliable && have_reliable_subs (wr)) if (wr->reliable && have_reliable_subs (wr))
do_insert = 1; do_insert = 1;
else if (wr->handle_as_transient_local || wr->startup_mode) else if (wr->handle_as_transient_local)
do_insert = 1; do_insert = 1;
else else
do_insert = 0; do_insert = 0;
@ -1041,7 +1025,7 @@ static int write_sample_eot (struct thread_state1 * const ts1, struct nn_xpack *
nn_mtime_t tnow; nn_mtime_t tnow;
/* If GC not allowed, we must be sure to never block when writing. That is only the case for (true, aggressive) KEEP_LAST writers, and also only if there is no limit to how much unacknowledged data the WHC may contain. */ /* If GC not allowed, we must be sure to never block when writing. That is only the case for (true, aggressive) KEEP_LAST writers, and also only if there is no limit to how much unacknowledged data the WHC may contain. */
assert(gc_allowed || (wr->xqos->history.kind == NN_KEEP_LAST_HISTORY_QOS && wr->aggressive_keep_last && wr->whc_low == INT32_MAX)); assert(gc_allowed || (wr->xqos->history.kind == NN_KEEP_LAST_HISTORY_QOS && wr->whc_low == INT32_MAX));
(void)gc_allowed; (void)gc_allowed;
if (ddsi_serdata_size (serdata) > config.max_sample_size) if (ddsi_serdata_size (serdata) > config.max_sample_size)

View file

@ -63,7 +63,6 @@ enum xeventkind
XEVK_ACKNACK, XEVK_ACKNACK,
XEVK_SPDP, XEVK_SPDP,
XEVK_PMD_UPDATE, XEVK_PMD_UPDATE,
XEVK_END_STARTUP_MODE,
XEVK_DELETE_WRITER, XEVK_DELETE_WRITER,
XEVK_CALLBACK XEVK_CALLBACK
}; };
@ -93,10 +92,6 @@ struct xevent
#if 0 #if 0
struct { struct {
} info; } info;
#endif
#if 0
struct {
} end_startup_mode;
#endif #endif
struct { struct {
nn_guid_t guid; nn_guid_t guid;
@ -312,7 +307,6 @@ static void free_xevent (struct xeventq *evq, struct xevent *ev)
case XEVK_ACKNACK: case XEVK_ACKNACK:
case XEVK_SPDP: case XEVK_SPDP:
case XEVK_PMD_UPDATE: case XEVK_PMD_UPDATE:
case XEVK_END_STARTUP_MODE:
case XEVK_DELETE_WRITER: case XEVK_DELETE_WRITER:
case XEVK_CALLBACK: case XEVK_CALLBACK:
break; break;
@ -805,15 +799,6 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p
an->smhdr.flags |= ACKNACK_FLAG_FINAL; an->smhdr.flags |= ACKNACK_FLAG_FINAL;
} }
/* If we refuse to send invalid AckNacks, grow a length-0 bitmap and
zero-fill it. Cleared bits are meaningless (DDSI 2.1, table 8.33,
although RTI seems to think otherwise). */
if (numbits == 0 && config.acknack_numbits_emptyset > 0)
{
an->readerSNState.numbits = (unsigned) config.acknack_numbits_emptyset;
nn_bitset_zero (an->readerSNState.numbits, an->readerSNState.bits);
}
{ {
/* Count field is at a variable offset ... silly DDSI spec. */ /* Count field is at a variable offset ... silly DDSI spec. */
nn_count_t *countp = nn_count_t *countp =
@ -1195,21 +1180,6 @@ static void handle_xevk_pmd_update (struct thread_state1 * const ts1, struct nn_
ddsrt_mutex_unlock (&pp->e.lock); ddsrt_mutex_unlock (&pp->e.lock);
} }
static void handle_xevk_end_startup_mode (UNUSED_ARG (struct nn_xpack *xp), struct xevent *ev, UNUSED_ARG (nn_mtime_t tnow))
{
struct ephash_enum_writer est;
struct writer *wr;
assert (gv.startup_mode);
DDS_LOG(DDS_LC_DISCOVERY, "end startup mode\n");
gv.startup_mode = 0;
/* FIXME: MEMBAR needed for startup mode (or use a lock) */
ephash_enum_writer_init (&est);
while ((wr = ephash_enum_writer_next (&est)) != NULL)
writer_exit_startup_mode (wr);
ephash_enum_writer_fini (&est);
delete_xevent (ev);
}
static void handle_xevk_delete_writer (UNUSED_ARG (struct nn_xpack *xp), struct xevent *ev, UNUSED_ARG (nn_mtime_t tnow)) static void handle_xevk_delete_writer (UNUSED_ARG (struct nn_xpack *xp), struct xevent *ev, UNUSED_ARG (nn_mtime_t tnow))
{ {
/* don't worry if the writer is already gone by the time we get here. */ /* don't worry if the writer is already gone by the time we get here. */
@ -1234,9 +1204,6 @@ static void handle_individual_xevent (struct thread_state1 * const ts1, struct x
case XEVK_PMD_UPDATE: case XEVK_PMD_UPDATE:
handle_xevk_pmd_update (ts1, xp, xev, tnow); handle_xevk_pmd_update (ts1, xp, xev, tnow);
break; break;
case XEVK_END_STARTUP_MODE:
handle_xevk_end_startup_mode (xp, xev, tnow);
break;
case XEVK_DELETE_WRITER: case XEVK_DELETE_WRITER:
handle_xevk_delete_writer (xp, xev, tnow); handle_xevk_delete_writer (xp, xev, tnow);
break; break;
@ -1582,16 +1549,6 @@ struct xevent *qxev_pmd_update (nn_mtime_t tsched, const nn_guid_t *pp_guid)
return ev; return ev;
} }
struct xevent *qxev_end_startup_mode (nn_mtime_t tsched)
{
struct xevent *ev;
ddsrt_mutex_lock (&gv.xevents->lock);
ev = qxev_common (gv.xevents, tsched, XEVK_END_STARTUP_MODE);
qxev_insert (ev);
ddsrt_mutex_unlock (&gv.xevents->lock);
return ev;
}
struct xevent *qxev_delete_writer (nn_mtime_t tsched, const nn_guid_t *guid) struct xevent *qxev_delete_writer (nn_mtime_t tsched, const nn_guid_t *guid)
{ {
struct xevent *ev; struct xevent *ev;