use kqueue for input multiplexing on macOS

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2018-08-05 13:09:53 +02:00 committed by eboasson
parent 48d770d40f
commit 0873f4a401
4 changed files with 382 additions and 100 deletions

View file

@ -62,8 +62,10 @@ void os_sockWaitsetTrigger (os_sockWaitset ws);
Closing a connection associated with a waitset is handled gracefully: no Closing a connection associated with a waitset is handled gracefully: no
operations will signal errors because of it. operations will signal errors because of it.
Returns < 0 on error, 0 if already present, 1 if added
*/ */
void os_sockWaitsetAdd (os_sockWaitset ws, struct ddsi_tran_conn * conn); int os_sockWaitsetAdd (os_sockWaitset ws, struct ddsi_tran_conn * conn);
/* /*
Drops all connections from the waitset from index onwards. Index Drops all connections from the waitset from index onwards. Index

View file

@ -47,9 +47,8 @@ void ddsi_tran_factories_fini (void)
ddsi_tran_factory_t factory; ddsi_tran_factory_t factory;
while ((factory = ddsi_tran_factories) != NULL) { while ((factory = ddsi_tran_factories) != NULL) {
ddsi_tran_factories = ddsi_tran_factories->m_factory;
ddsi_factory_free(factory); ddsi_factory_free(factory);
ddsi_tran_factories = ddsi_tran_factories->m_factory;
} }
} }
@ -97,6 +96,9 @@ void ddsi_conn_free (ddsi_tran_conn_t conn)
if (! conn->m_closed) if (! conn->m_closed)
{ {
conn->m_closed = true; conn->m_closed = true;
/* FIXME: rethink the socket waitset & the deleting of entries; the biggest issue is TCP handling that can open & close sockets at will and yet expects the waitset to wake up at the apprioriate times. (This pretty much works with the select-based version, but not the kqueue-based one.) TCP code can also have connections without a socket ... Calling sockWaitsetRemove here (where there shouldn't be any knowledge of it) at least ensures that it is removed in time and that there can't be aliasing of connections and sockets. */
if (ddsi_conn_handle (conn) != Q_INVALID_SOCKET)
os_sockWaitsetRemove (gv.waitset, conn);
if (conn->m_factory->m_close_conn_fn) if (conn->m_factory->m_close_conn_fn)
{ {
(conn->m_factory->m_close_conn_fn) (conn); (conn->m_factory->m_close_conn_fn) (conn);

View file

@ -1440,8 +1440,6 @@ void rtps_term (void)
ut_thread_pool_free (gv.thread_pool); ut_thread_pool_free (gv.thread_pool);
os_sockWaitsetFree (gv.waitset);
(void) joinleave_spdp_defmcip (0); (void) joinleave_spdp_defmcip (0);
ddsi_conn_free (gv.disc_conn_mc); ddsi_conn_free (gv.disc_conn_mc);
@ -1460,6 +1458,7 @@ void rtps_term (void)
free_group_membership(gv.mship); free_group_membership(gv.mship);
ddsi_tran_factories_fini (); ddsi_tran_factories_fini ();
os_sockWaitsetFree (gv.waitset);
if (gv.pcap_fp) if (gv.pcap_fp)
{ {

View file

@ -9,10 +9,6 @@
* *
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/ */
#if defined (WIN32) || defined (OSPL_LINUX)
#define FD_SETSIZE 4096
#endif
#include <assert.h> #include <assert.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -26,15 +22,265 @@
#define WAITSET_DELTA 8 #define WAITSET_DELTA 8
#ifdef __VXWORKS__ #define MODE_KQUEUE 1
#include <pipeDrv.h> #define MODE_SELECT 2
#include <ioLib.h> #define MODE_WFMEVS 3
#include <string.h>
#include <selectLib.h> #if defined __APPLE__
#define OSPL_PIPENAMESIZE 26 #define MODE_SEL MODE_KQUEUE
#elif defined WINCE
#define MODE_SEL MODE_WFMEVS
#else
#define MODE_SEL MODE_SELECT
#endif #endif
#ifdef WINCE #if MODE_SEL == MODE_KQUEUE
#include <unistd.h>
#include <fcntl.h>
#include <sys/fcntl.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
struct os_sockWaitsetCtx
{
struct kevent *evs;
uint32_t nevs;
uint32_t evs_sz;
uint32_t index; /* cursor for enumerating */
};
struct entry {
uint32_t index;
int fd;
ddsi_tran_conn_t conn;
};
struct os_sockWaitset
{
int kqueue;
os_handle pipe[2]; /* pipe used for triggering */
os_atomic_uint32_t sz;
struct entry *entries;
struct os_sockWaitsetCtx ctx; /* set of descriptors being handled */
os_mutex lock; /* for add/delete */
};
static int add_entry_locked (os_sockWaitset ws, ddsi_tran_conn_t conn, int fd)
{
uint32_t idx, fidx, sz, n;
struct kevent kev;
assert (fd >= 0);
sz = os_atomic_ld32 (&ws->sz);
for (idx = 0, fidx = UINT32_MAX, n = 0; idx < sz; idx++)
{
if (ws->entries[idx].fd == -1)
fidx = (idx < fidx) ? idx : fidx;
else if (ws->entries[idx].conn == conn)
return 0;
else
n++;
}
if (fidx == UINT32_MAX)
{
const uint32_t newsz = os_atomic_add32_nv (&ws->sz, WAITSET_DELTA);
ws->entries = os_realloc (ws->entries, newsz * sizeof (*ws->entries));
for (idx = sz; idx < newsz; idx++)
ws->entries[idx].fd = -1;
fidx = sz;
}
EV_SET (&kev, (unsigned)fd, EVFILT_READ, EV_ADD, 0, 0, &ws->entries[fidx]);
if (kevent(ws->kqueue, &kev, 1, NULL, 0, NULL) == -1)
return -1;
ws->entries[fidx].conn = conn;
ws->entries[fidx].fd = fd;
ws->entries[fidx].index = n;
return 1;
}
os_sockWaitset os_sockWaitsetNew (void)
{
const uint32_t sz = WAITSET_DELTA;
os_sockWaitset ws;
uint32_t i;
if ((ws = os_malloc (sizeof (*ws))) == NULL)
goto fail_waitset;
os_atomic_st32 (&ws->sz, sz);
if ((ws->entries = os_malloc (sz * sizeof (*ws->entries))) == NULL)
goto fail_entries;
for (i = 0; i < sz; i++)
ws->entries[i].fd = -1;
ws->ctx.nevs = 0;
ws->ctx.index = 0;
ws->ctx.evs_sz = sz;
if ((ws->ctx.evs = malloc (ws->ctx.evs_sz * sizeof (*ws->ctx.evs))) == NULL)
goto fail_ctx_evs;
if ((ws->kqueue = kqueue ()) == -1)
goto fail_kqueue;
if (pipe (ws->pipe) == -1)
goto fail_pipe;
if (add_entry_locked (ws, NULL, ws->pipe[0]) < 0)
goto fail_add_trigger;
assert (ws->entries[0].fd == ws->pipe[0]);
if (fcntl (ws->kqueue, F_SETFD, fcntl (ws->kqueue, F_GETFD) | FD_CLOEXEC) == -1)
goto fail_fcntl;
if (fcntl (ws->pipe[0], F_SETFD, fcntl (ws->pipe[0], F_GETFD) | FD_CLOEXEC) == -1)
goto fail_fcntl;
if (fcntl (ws->pipe[1], F_SETFD, fcntl (ws->pipe[1], F_GETFD) | FD_CLOEXEC) == -1)
goto fail_fcntl;
os_mutexInit (&ws->lock);
return ws;
fail_fcntl:
fail_add_trigger:
close (ws->pipe[0]);
close (ws->pipe[1]);
fail_pipe:
close (ws->kqueue);
fail_kqueue:
os_free (ws->ctx.evs);
fail_ctx_evs:
os_free (ws->entries);
fail_entries:
os_free (ws);
fail_waitset:
return NULL;
}
void os_sockWaitsetFree (os_sockWaitset ws)
{
os_mutexDestroy (&ws->lock);
close (ws->pipe[0]);
close (ws->pipe[1]);
close (ws->kqueue);
os_free (ws->entries);
os_free (ws->ctx.evs);
os_free (ws);
}
void os_sockWaitsetTrigger (os_sockWaitset ws)
{
char buf = 0;
int n;
int err;
n = (int)write (ws->pipe[1], &buf, 1);
if (n != 1)
{
err = os_getErrno ();
NN_WARNING ("os_sockWaitsetTrigger: read failed on trigger pipe, errno = %d", err);
}
}
int os_sockWaitsetAdd (os_sockWaitset ws, ddsi_tran_conn_t conn)
{
int ret;
os_mutexLock (&ws->lock);
ret = add_entry_locked (ws, conn, ddsi_conn_handle (conn));
os_mutexUnlock (&ws->lock);
return ret;
}
void os_sockWaitsetPurge (os_sockWaitset ws, unsigned index)
{
/* Sockets may have been closed by the Purge is called, but any closed sockets
are automatically deleted from the kqueue and the file descriptors be reused
in the meantime. It therefore seems wiser replace the kqueue then to delete
entries */
uint32_t i, sz;
struct kevent kev;
os_mutexLock (&ws->lock);
sz = os_atomic_ld32 (&ws->sz);
close (ws->kqueue);
if ((ws->kqueue = kqueue()) == -1)
abort (); /* FIXME */
for (i = 0; i <= index; i++)
{
assert (ws->entries[i].fd >= 0);
EV_SET(&kev, (unsigned)ws->entries[i].fd, EVFILT_READ, EV_ADD, 0, 0, &ws->entries[i]);
if (kevent(ws->kqueue, &kev, 1, NULL, 0, NULL) == -1)
abort (); /* FIXME */
}
for (; i < sz; i++)
{
ws->entries[i].conn = NULL;
ws->entries[i].fd = -1;
}
os_mutexUnlock (&ws->lock);
}
void os_sockWaitsetRemove (os_sockWaitset ws, ddsi_tran_conn_t conn)
{
const int fd = ddsi_conn_handle (conn);
uint32_t i, sz;
assert (fd >= 0);
os_mutexLock (&ws->lock);
sz = os_atomic_ld32 (&ws->sz);
for (i = 1; i < sz; i++)
if (ws->entries[i].fd == fd)
break;
if (i < sz)
{
struct kevent kev;
EV_SET(&kev, (unsigned)ws->entries[i].fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
if (kevent(ws->kqueue, &kev, 1, NULL, 0, NULL) == -1)
abort (); /* FIXME */
ws->entries[i].fd = -1;
}
os_mutexUnlock (&ws->lock);
}
os_sockWaitsetCtx os_sockWaitsetWait (os_sockWaitset ws)
{
/* if the array of events is smaller than the number of file descriptors in the
kqueue, things will still work fine, as the kernel will just return what can
be stored, and the set will be grown on the next call */
uint32_t ws_sz = os_atomic_ld32 (&ws->sz);
int nevs;
if (ws->ctx.evs_sz < ws_sz)
{
ws->ctx.evs_sz = ws_sz;
ws->ctx.evs = os_realloc (ws->ctx.evs, ws_sz * sizeof(*ws->ctx.evs));
}
nevs = kevent (ws->kqueue, NULL, 0, ws->ctx.evs, (int)ws->ctx.evs_sz, NULL);
if (nevs < 0)
{
if (errno == EINTR)
nevs = 0;
else
{
NN_WARNING ("os_sockWaitsetWait: kevent failed, errno = %d", os_getErrno());
return NULL;
}
}
ws->ctx.nevs = (uint32_t)nevs;
ws->ctx.index = 0;
return &ws->ctx;
}
int os_sockWaitsetNextEvent (os_sockWaitsetCtx ctx, ddsi_tran_conn_t *conn)
{
while (ctx->index < ctx->nevs)
{
uint32_t idx = ctx->index++;
struct entry * const entry = ctx->evs[idx].udata;
if (entry->index > 0)
{
*conn = entry->conn;
return (int)(entry->index - 1);
}
else
{
/* trigger pipe, read & try again */
char dummy;
read ((int)ctx->evs[idx].ident, &dummy, 1);
}
}
return -1;
}
#elif MODE_SEL == MODE_WFMEVS
struct os_sockWaitsetCtx struct os_sockWaitsetCtx
{ {
@ -121,11 +367,12 @@ void os_sockWaitsetTrigger (os_sockWaitset ws)
} }
} }
void os_sockWaitsetAdd (os_sockWaitset ws, ddsi_tran_conn_t conn) int os_sockWaitsetAdd (os_sockWaitset ws, ddsi_tran_conn_t conn)
{ {
WSAEVENT ev; WSAEVENT ev;
os_socket sock = ddsi_conn_handle (conn); os_socket sock = ddsi_conn_handle (conn);
unsigned idx; unsigned idx;
int ret;
os_mutexLock (&ws->mutex); os_mutexLock (&ws->mutex);
@ -134,25 +381,33 @@ void os_sockWaitsetAdd (os_sockWaitset ws, ddsi_tran_conn_t conn)
if (ws->ctx.conns[idx] == conn) if (ws->ctx.conns[idx] == conn)
break; break;
} }
if (idx == ws->ctx.n) if (idx < ws->ctx.n)
ret = 0;
else
{ {
assert (ws->n < MAXIMUM_WAIT_OBJECTS); assert (ws->n < MAXIMUM_WAIT_OBJECTS);
if ((ev = WSACreateEvent ()) == WSA_INVALID_EVENT)
ev = WSACreateEvent (); ret = -1;
assert (ev != WSA_INVALID_EVENT); else
if (WSAEventSelect (sock, ev, FD_READ) == SOCKET_ERROR)
{ {
NN_WARNING ("os_sockWaitsetAdd: WSAEventSelect(%x,%x) failed, error %d", (os_uint32) sock, (os_uint32) ev, os_getErrno ()); if (WSAEventSelect (sock, ev, FD_READ) == SOCKET_ERROR)
WSACloseEvent (ev); {
assert (0); NN_WARNING ("os_sockWaitsetAdd: WSAEventSelect(%x,%x) failed, error %d", (os_uint32) sock, (os_uint32) ev, os_getErrno ());
WSACloseEvent (ev);
ret = -1;
}
else
{
ws->ctx.conns[ws->ctx.n] = conn;
ws->ctx.events[ws->ctx.n] = ev;
ws->ctx.n++;
ret = 1;
}
} }
ws->ctx.conns[ws->ctx.n] = conn;
ws->ctx.events[ws->ctx.n] = ev;
ws->ctx.n++;
} }
os_mutexUnlock (&ws->mutex); os_mutexUnlock (&ws->mutex);
return ret;
} }
os_sockWaitsetCtx os_sockWaitsetWait (os_sockWaitset ws) os_sockWaitsetCtx os_sockWaitsetWait (os_sockWaitset ws)
@ -204,11 +459,11 @@ os_sockWaitsetCtx os_sockWaitsetWait (os_sockWaitset ws)
} }
/* This implementation follows the pattern of simply looking at the /* This implementation follows the pattern of simply looking at the
socket that triggered the wakeup; alternatively, one could scan the socket that triggered the wakeup; alternatively, one could scan the
entire set as we do for select(). If the likelihood of two sockets entire set as we do for select(). If the likelihood of two sockets
having an event simultaneously is small, this is better, but if it having an event simultaneously is small, this is better, but if it
is large, the lower indices may get a disproportionally large is large, the lower indices may get a disproportionally large
amount of attention. */ amount of attention. */
int os_sockWaitsetNextEvent (os_sockWaitsetCtx ctx, ddsi_tran_conn_t * conn) int os_sockWaitsetNextEvent (os_sockWaitsetCtx ctx, ddsi_tran_conn_t * conn)
{ {
@ -232,7 +487,7 @@ int os_sockWaitsetNextEvent (os_sockWaitsetCtx ctx, ddsi_tran_conn_t * conn)
if (err != WSAENOTSOCK) if (err != WSAENOTSOCK)
{ {
/* May have a wakeup and a close in parallel, so the handle /* May have a wakeup and a close in parallel, so the handle
need not exist anymore. */ need not exist anymore. */
NN_ERROR ("os_sockWaitsetNextEvent: WSAEnumNetworkEvents(%x,%x,...) failed, error %d", (os_uint32) handle, (os_uint32) ctx->events[idx], err); NN_ERROR ("os_sockWaitsetNextEvent: WSAEnumNetworkEvents(%x,%x,...) failed, error %d", (os_uint32) handle, (os_uint32) ctx->events[idx], err);
} }
return -1; return -1;
@ -243,64 +498,17 @@ int os_sockWaitsetNextEvent (os_sockWaitsetCtx ctx, ddsi_tran_conn_t * conn)
} }
} }
#else /* WINCE */ #elif MODE_SEL == MODE_SELECT
#if defined (_WIN32) #ifdef __VXWORKS__
#include <pipeDrv.h>
#include <ioLib.h>
#include <string.h>
#include <selectLib.h>
#define OSPL_PIPENAMESIZE 26
#endif
static int pipe (os_handle fd[2]) #ifndef _WIN32
{
struct sockaddr_in addr;
socklen_t asize = sizeof (addr);
os_socket listener = socket (AF_INET, SOCK_STREAM, 0);
os_socket s1 = socket (AF_INET, SOCK_STREAM, 0);
os_socket s2 = Q_INVALID_SOCKET;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
addr.sin_port = 0;
if (bind (listener, (struct sockaddr *) &addr, sizeof (addr)) == -1)
{
goto fail;
}
if (getsockname (listener, (struct sockaddr *) &addr, &asize) == -1)
{
goto fail;
}
if (listen (listener, 1) == -1)
{
goto fail;
}
if (connect (s1, (struct sockaddr *) &addr, sizeof (addr)) == -1)
{
goto fail;
}
if ((s2 = accept (listener, 0, 0)) == -1)
{
goto fail;
}
closesocket (listener);
/* Equivalent to FD_CLOEXEC */
SetHandleInformation ((HANDLE) s1, HANDLE_FLAG_INHERIT, 0);
SetHandleInformation ((HANDLE) s2, HANDLE_FLAG_INHERIT, 0);
fd[0] = s1;
fd[1] = s2;
return 0;
fail:
closesocket (listener);
closesocket (s1);
closesocket (s2);
return -1;
}
#else
#ifndef __VXWORKS__ #ifndef __VXWORKS__
#if defined (AIX) || defined (__Lynx__) || defined (__QNX__) #if defined (AIX) || defined (__Lynx__) || defined (__QNX__)
@ -345,6 +553,72 @@ struct os_sockWaitset
struct os_sockWaitsetCtx ctx; /* set of descriptors being handled */ struct os_sockWaitsetCtx ctx; /* set of descriptors being handled */
}; };
#if defined (_WIN32)
static int make_pipe (os_handle fd[2])
{
struct sockaddr_in addr;
socklen_t asize = sizeof (addr);
os_socket listener = socket (AF_INET, SOCK_STREAM, 0);
os_socket s1 = socket (AF_INET, SOCK_STREAM, 0);
os_socket s2 = Q_INVALID_SOCKET;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
addr.sin_port = 0;
if (bind (listener, (struct sockaddr *)&addr, sizeof (addr)) == -1)
goto fail;
if (getsockname (listener, (struct sockaddr *)&addr, &asize) == -1)
goto fail;
if (listen (listener, 1) == -1)
goto fail;
if (connect (s1, (struct sockaddr *)&addr, sizeof (addr)) == -1)
goto fail;
if ((s2 = accept (listener, 0, 0)) == -1)
goto fail;
closesocket (listener);
/* Equivalent to FD_CLOEXEC */
SetHandleInformation ((HANDLE) s1, HANDLE_FLAG_INHERIT, 0);
SetHandleInformation ((HANDLE) s2, HANDLE_FLAG_INHERIT, 0);
fd[0] = s1;
fd[1] = s2;
return 0;
fail:
closesocket (listener);
closesocket (s1);
closesocket (s2);
return -1;
}
#elif defined (VXWORKS_RTP) || defined (_WRS_KERNEL)
static int make_pipe (int pfd[2])
{
char pipename[OSPL_PIPENAMESIZE];
int pipecount = 0;
do {
snprintf ((char*)&pipename, sizeof (pipename), "/pipe/ospl%d", pipecount++);
} while ((result = pipeDevCreate ((char*)&pipename, 1, 1)) == -1 && os_getErrno() == EINVAL);
if (result == -1)
goto fail_pipedev;
if ((pfd[0] = open ((char*)&pipename, O_RDWR, 0644)) == -1)
goto fail_open0;
if ((pfd[1] = open ((char*)&pipename, O_RDWR, 0644)) == -1)
goto fail_open1;
return 0;
fail_open1:
close (pfd[0]);
fail_open0:
pipeDevDelete (pipename, 0);
fail_pipedev:
return -1;
}
#else
static int make_pipe (int pfd[2])
{
return pipe (pfd);
}
#endif
static void os_sockWaitsetNewSet (os_sockWaitsetSet * set) static void os_sockWaitsetNewSet (os_sockWaitsetSet * set)
{ {
set->fds = os_malloc (WAITSET_DELTA * sizeof (*set->fds)); set->fds = os_malloc (WAITSET_DELTA * sizeof (*set->fds));
@ -374,6 +648,7 @@ os_sockWaitset os_sockWaitsetNew (void)
#endif #endif
#if defined (VXWORKS_RTP) || defined (_WRS_KERNEL) #if defined (VXWORKS_RTP) || defined (_WRS_KERNEL)
int make_pipe (int pfd[2])
{ {
char pipename[OSPL_PIPENAMESIZE]; char pipename[OSPL_PIPENAMESIZE];
int pipecount=0; int pipecount=0;
@ -388,7 +663,7 @@ os_sockWaitset os_sockWaitsetNew (void)
if (result != -1) if (result != -1)
{ {
ws->pipe[0] = result; ws->pipe[0] = result;
result =open ((char*) &pipename, O_RDWR, 0644); result = open ((char*) &pipename, O_RDWR, 0644);
if (result != -1) if (result != -1)
{ {
ws->pipe[1] = result; ws->pipe[1] = result;
@ -402,7 +677,7 @@ os_sockWaitset os_sockWaitsetNew (void)
} }
} }
#else #else
result = pipe (ws->pipe); result = make_pipe (ws->pipe);
#endif #endif
assert (result != -1); assert (result != -1);
(void) result; (void) result;
@ -482,11 +757,12 @@ void os_sockWaitsetTrigger (os_sockWaitset ws)
} }
} }
void os_sockWaitsetAdd (os_sockWaitset ws, ddsi_tran_conn_t conn) int os_sockWaitsetAdd (os_sockWaitset ws, ddsi_tran_conn_t conn)
{ {
os_handle handle = ddsi_conn_handle (conn); os_handle handle = ddsi_conn_handle (conn);
os_sockWaitsetSet * set = &ws->set; os_sockWaitsetSet * set = &ws->set;
unsigned idx; unsigned idx;
int ret;
assert (handle >= 0); assert (handle >= 0);
#if ! defined (_WIN32) #if ! defined (_WIN32)
@ -499,23 +775,23 @@ void os_sockWaitsetAdd (os_sockWaitset ws, ddsi_tran_conn_t conn)
if (set->conns[idx] == conn) if (set->conns[idx] == conn)
break; break;
} }
if (idx == set->n) if (idx < set->n)
ret = 0;
else
{ {
if (set->n == set->sz) if (set->n == set->sz)
{
os_sockWaitsetGrow (set); os_sockWaitsetGrow (set);
}
#if ! defined (_WIN32) #if ! defined (_WIN32)
if ((int) handle >= ws->fdmax_plus_1) if ((int) handle >= ws->fdmax_plus_1)
{
ws->fdmax_plus_1 = handle + 1; ws->fdmax_plus_1 = handle + 1;
}
#endif #endif
set->conns[set->n] = conn; set->conns[set->n] = conn;
set->fds[set->n] = handle; set->fds[set->n] = handle;
set->n++; set->n++;
ret = 1;
} }
os_mutexUnlock (&ws->mutex); os_mutexUnlock (&ws->mutex);
return ret;
} }
void os_sockWaitsetPurge (os_sockWaitset ws, unsigned index) void os_sockWaitsetPurge (os_sockWaitset ws, unsigned index)
@ -655,4 +931,7 @@ int os_sockWaitsetNextEvent (os_sockWaitsetCtx ctx, ddsi_tran_conn_t * conn)
} }
return -1; return -1;
} }
#endif /* WINCE */
#else
#error "no mode selected"
#endif