update wait set and rcl_wait for timers

This commit is contained in:
William Woodall 2015-12-04 17:22:48 -08:00
parent 22761b6596
commit 7ab361a5fb
3 changed files with 200 additions and 51 deletions

View file

@ -122,6 +122,9 @@ rcl_timer_init(
* This function is not thread-safe with any rcl_timer_* functions used on the
* same timer object.
*
* This function is not thread-safe.
* This function is lock-free.
*
* \param[inout] timer the handle to the timer to be finalized.
* \return RCL_RET_OK if the timer was finalized successfully, or
* RCL_RET_ERROR an unspecified error occur.

View file

@ -25,6 +25,7 @@ extern "C"
#include "rcl/subscription.h"
#include "rcl/guard_condition.h"
#include "rcl/timer.h"
#include "rcl/types.h"
struct rcl_wait_set_impl_t;
@ -37,10 +38,9 @@ typedef struct rcl_wait_set_t {
/// Storage for guard condition pointers.
const rcl_guard_condition_t ** guard_conditions;
size_t size_of_guard_conditions;
/// Allocator for storage.
rcl_allocator_t allocator;
/// If set to true, actions like add_subscription will fail until cleared.
bool pruned;
/// Storage for timer pointers.
const rcl_timer_t ** timers;
size_t size_of_timers;
/// Implementation specific storage.
struct rcl_wait_set_impl_t * impl;
} rcl_wait_set_t;
@ -81,6 +81,7 @@ rcl_get_zero_initialized_wait_set();
* \param[inout] wait_set the wait set struct to be initialized
* \param[in] number_of_subscriptions non-zero size of the subscriptions set
* \param[in] number_of_guard_conditions non-zero size of the guard conditions set
* \param[in] number_of_timers non-zero size of the timers set
* \param[in] allocator the allocator to use when allocating space in the sets
* \return RCL_RET_OK if the wait set is initialized successfully, or
* RCL_RET_ALREADY_INIT if the wait set is not zero initialized, or
@ -93,6 +94,7 @@ rcl_wait_set_init(
rcl_wait_set_t * wait_set,
size_t number_of_subscriptions,
size_t number_of_guard_conditions,
size_t number_of_timers,
rcl_allocator_t allocator);
/// Finalize a rcl wait set.
@ -107,6 +109,7 @@ rcl_wait_set_init(
* succeed.
*
* This function is not thread-safe.
* This function is lock-free.
*
* \param[inout] wait_set the wait set struct to be finalized.
* \return RCL_RET_OK if the finalization was successful, or
@ -116,11 +119,28 @@ rcl_wait_set_init(
rcl_ret_t
rcl_wait_set_fini(rcl_wait_set_t * wait_set);
/// Retrieve the wait set's allocator.
/* The allocator must be an allocated rcl_allocator_t struct, as the result is
* copied into this variable.
*
* This function is not thread-safe.
* This function is lock-free.
*
* \param[in] wait_set the handle to the wait set
* \param[out] allocator the rcl_allocator_t struct to which the result is copied
* \return RCL_RET_OK if the allocator was successfully retrieved, or
* RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or
* RCL_RET_ERROR if an unspecified error occurs.
*/
rcl_ret_t
rcl_wait_set_get_allocator(const rcl_wait_set_t * wait_set, rcl_allocator_t * allocator);
/// Stores a pointer to the given subscription in the next empty spot in the set.
/* This function does not guarantee that the subscription is not already in the
* wait set.
*
* This function is not thread-safe.
* This function is lock-free.
*
* \param[inout] wait_set struct in which the subscription is to be stored
* \param[in] subscription the subscription to be added to the wait set
@ -142,6 +162,7 @@ rcl_wait_set_add_subscription(
* Calling this on an uninitialized (zero initialized) wait set will fail.
*
* This function is not thread-safe.
* This function is lock-free.
*
* \param[inout] wait_set struct to have its subscriptions cleared
* \return RCL_RET_OK if cleared successfully, or
@ -196,13 +217,36 @@ rcl_wait_set_add_guard_condition(
rcl_ret_t
rcl_wait_set_clear_guard_conditions(rcl_wait_set_t * wait_set);
/// Reallocates space for the subscriptions in the wait set.
/// Reallocates space for the guard conditions in the wait set.
/* This function behaves exactly the same as for subscriptions.
* \see rcl_wait_set_resize_subscriptions
*/
rcl_ret_t
rcl_wait_set_resize_guard_conditions(rcl_wait_set_t * wait_set, size_t size);
/// Stores a pointer to the timer in the next empty spot in the set.
/* This function behaves exactly the same as for subscriptions.
* \see rcl_wait_set_add_subscription
*/
rcl_ret_t
rcl_wait_set_add_timer(
rcl_wait_set_t * wait_set,
const rcl_timer_t * timer);
/// Removes (sets to NULL) the timers in the wait set.
/* This function behaves exactly the same as for subscriptions.
* \see rcl_wait_set_clear_subscriptions
*/
rcl_ret_t
rcl_wait_set_clear_timers(rcl_wait_set_t * wait_set);
/// Reallocates space for the timers in the wait set.
/* This function behaves exactly the same as for subscriptions.
* \see rcl_wait_set_resize_subscriptions
*/
rcl_ret_t
rcl_wait_set_resize_timers(rcl_wait_set_t * wait_set, size_t size);
/// Block until the wait set is ready or until the timeout has been exceeded.
/* This function will collect the items in the rcl_wait_set_t and pass them
* to the underlying rmw_wait function.

View file

@ -34,6 +34,8 @@ typedef struct rcl_wait_set_impl_t {
rmw_subscriptions_t rmw_subscriptions;
size_t guard_condition_index;
rmw_guard_conditions_t rmw_guard_conditions;
size_t timer_index;
rcl_allocator_t allocator;
} rcl_wait_set_impl_t;
rcl_wait_set_t
@ -44,7 +46,7 @@ rcl_get_zero_initialized_wait_set()
}
static bool
__wait_set_is_valid(rcl_wait_set_t * wait_set)
__wait_set_is_valid(const rcl_wait_set_t * wait_set)
{
return wait_set && wait_set->impl;
}
@ -58,6 +60,9 @@ __wait_set_clean_up(rcl_wait_set_t * wait_set, rcl_allocator_t allocator)
if (wait_set->guard_conditions) {
rcl_wait_set_resize_guard_conditions(wait_set, 0);
}
if (wait_set->timers) {
rcl_wait_set_resize_timers(wait_set, 0);
}
if (wait_set->impl) {
allocator.deallocate(wait_set->impl, allocator.state);
wait_set->impl = NULL;
@ -69,6 +74,7 @@ rcl_wait_set_init(
rcl_wait_set_t * wait_set,
size_t number_of_subscriptions,
size_t number_of_guard_conditions,
size_t number_of_timers,
rcl_allocator_t allocator)
{
rcl_ret_t fail_ret = RCL_RET_ERROR;
@ -118,10 +124,18 @@ rcl_wait_set_init(
fail_ret = ret;
goto fail;
}
// Initialize timer space.
ret = rcl_wait_set_resize_timers(wait_set, number_of_timers);
if (ret != RCL_RET_OK) {
fail_ret = ret;
goto fail;
}
if ((ret = rcl_wait_set_clear_timers(wait_set)) != RCL_RET_OK) {
fail_ret = ret;
goto fail;
}
// Set allocator.
wait_set->allocator = allocator;
// Initialize pruned.
wait_set->pruned = false;
wait_set->impl->allocator = allocator;
return RCL_RET_OK;
fail:
__wait_set_clean_up(wait_set, allocator);
@ -134,12 +148,25 @@ rcl_wait_set_fini(rcl_wait_set_t * wait_set)
rcl_ret_t result = RCL_RET_OK;
RCL_CHECK_ARGUMENT_FOR_NULL(wait_set, RCL_RET_INVALID_ARGUMENT);
if (__wait_set_is_valid(wait_set)) {
__wait_set_clean_up(wait_set, wait_set->allocator);
__wait_set_clean_up(wait_set, wait_set->impl->allocator);
}
return result;
}
#define SET_ADD(Type, RMWStorage) \
rcl_ret_t
rcl_wait_set_get_allocator(const rcl_wait_set_t * wait_set, rcl_allocator_t * allocator)
{
RCL_CHECK_ARGUMENT_FOR_NULL(wait_set, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(allocator, RCL_RET_INVALID_ARGUMENT);
if (!__wait_set_is_valid(wait_set)) {
RCL_SET_ERROR_MSG("wait set is invalid");
return RCL_RET_WAIT_SET_INVALID;
}
*allocator = wait_set->impl->allocator;
return RCL_RET_OK;
}
#define SET_ADD(Type) \
RCL_CHECK_ARGUMENT_FOR_NULL(wait_set, RCL_RET_INVALID_ARGUMENT); \
RCL_CHECK_ARGUMENT_FOR_NULL(Type, RCL_RET_INVALID_ARGUMENT); \
if (!__wait_set_is_valid(wait_set)) { \
@ -151,15 +178,16 @@ rcl_wait_set_fini(rcl_wait_set_t * wait_set)
return RCL_RET_WAIT_SET_FULL; \
} \
size_t current_index = wait_set->impl->Type##_index++; \
wait_set->Type##s[current_index] = Type; \
wait_set->Type##s[current_index] = Type;
#define SET_ADD_RMW(Type, RMWStorage) \
/* Also place into rmw storage. */ \
rmw_##Type##_t * rmw_handle = rcl_##Type##_get_rmw_##Type##_handle(Type); \
RCL_CHECK_FOR_NULL_WITH_MSG( \
rmw_handle, rcl_get_error_string_safe(), return RCL_RET_ERROR); \
wait_set->impl->RMWStorage[current_index] = rmw_handle->data; \
return RCL_RET_OK;
wait_set->impl->RMWStorage[current_index] = rmw_handle->data;
#define SET_CLEAR(Type, RMWStorage, RMWCount) \
#define SET_CLEAR(Type) \
RCL_CHECK_ARGUMENT_FOR_NULL(wait_set, RCL_RET_INVALID_ARGUMENT); \
if (!__wait_set_is_valid(wait_set)) { \
RCL_SET_ERROR_MSG("wait set is invalid"); \
@ -167,67 +195,79 @@ rcl_wait_set_fini(rcl_wait_set_t * wait_set)
} \
memset(wait_set->Type##s, 0, sizeof(rcl_##Type##_t *) * wait_set->size_of_##Type##s); \
wait_set->impl->Type##_index = 0; \
#define SET_CLEAR_RMW(Type, RMWStorage, RMWCount) \
/* Also clear the rmw storage. */ \
memset( \
wait_set->impl->RMWStorage, \
0, \
sizeof(rmw_subscription_t *) * wait_set->impl->RMWCount); \
return RCL_RET_OK;
sizeof(rmw_subscription_t *) * wait_set->impl->RMWCount);
#define SET_RESIZE(Type, RMWStorage, RMWCount) \
#define SET_RESIZE(Type, ExtraDealloc, ExtraRealloc) \
RCL_CHECK_ARGUMENT_FOR_NULL(wait_set, RCL_RET_INVALID_ARGUMENT); \
RCL_CHECK_FOR_NULL_WITH_MSG( \
wait_set->impl, "wait set is invalid", return RCL_RET_WAIT_SET_INVALID); \
if (size == wait_set->size_of_##Type##s) { \
return RCL_RET_OK; \
} \
rcl_allocator_t allocator = wait_set->impl->allocator; \
if (size == 0) { \
if (wait_set->Type##s) { \
wait_set->allocator.deallocate(wait_set->Type##s, wait_set->allocator.state); \
allocator.deallocate(wait_set->Type##s, allocator.state); \
wait_set->Type##s = NULL; \
} \
/* Also deallocate the rmw storage. */ \
if (wait_set->impl->RMWStorage) { \
wait_set->allocator.deallocate(wait_set->impl->RMWStorage, wait_set->allocator.state); \
wait_set->impl->RMWStorage = NULL; \
} \
ExtraDealloc \
} \
else { \
wait_set->size_of_##Type##s = 0; \
wait_set->Type##s = (const rcl_##Type##_t **)wait_set->allocator.reallocate( \
wait_set->Type##s, sizeof(rcl_##Type##_t *) * size, wait_set->allocator.state); \
wait_set->Type##s = (const rcl_##Type##_t **)allocator.reallocate( \
wait_set->Type##s, sizeof(rcl_##Type##_t *) * size, allocator.state); \
RCL_CHECK_FOR_NULL_WITH_MSG( \
wait_set->Type##s, "allocating memory failed", return RCL_RET_BAD_ALLOC); \
wait_set->size_of_##Type##s = size; \
ExtraRealloc \
} \
return RCL_RET_OK;
#define SET_RESIZE_RMW_DEALLOC(RMWStorage, RMWCount) \
/* Also deallocate the rmw storage. */ \
if (wait_set->impl->RMWStorage) { \
allocator.deallocate(wait_set->impl->RMWStorage, allocator.state); \
wait_set->impl->RMWStorage = NULL; \
}
#define SET_RESIZE_RMW_REALLOC(Type, RMWStorage, RMWCount) \
/* Also resize the rmw storage. */ \
wait_set->impl->RMWCount = 0; \
wait_set->impl->RMWStorage = (void **)wait_set->allocator.reallocate( \
wait_set->impl->RMWStorage, sizeof(rcl_##Type##_t *) * size, wait_set->allocator.state); \
wait_set->impl->RMWStorage = (void **)allocator.reallocate( \
wait_set->impl->RMWStorage, sizeof(rcl_##Type##_t *) * size, allocator.state); \
if (!wait_set->impl->RMWStorage) { \
wait_set->allocator.deallocate(wait_set->Type##s, wait_set->allocator.state); \
allocator.deallocate(wait_set->Type##s, allocator.state); \
wait_set->size_of_##Type##s = 0; \
RCL_SET_ERROR_MSG("allocating memory failed"); \
return RCL_RET_BAD_ALLOC; \
} \
wait_set->impl->RMWCount = size; \
} \
return RCL_RET_OK;
wait_set->impl->RMWCount = size;
rcl_ret_t
rcl_wait_set_add_subscription(
rcl_wait_set_t * wait_set,
const rcl_subscription_t * subscription)
{
SET_ADD(subscription, rmw_subscriptions.subscribers)
SET_ADD(subscription)
SET_ADD_RMW(subscription, rmw_subscriptions.subscribers)
return RCL_RET_OK;
}
rcl_ret_t
rcl_wait_set_clear_subscriptions(rcl_wait_set_t * wait_set)
{
SET_CLEAR(
SET_CLEAR(subscription)
SET_CLEAR_RMW(
subscription,
rmw_subscriptions.subscribers,
rmw_subscriptions.subscriber_count)
return RCL_RET_OK;
}
rcl_ret_t
@ -235,8 +275,11 @@ rcl_wait_set_resize_subscriptions(rcl_wait_set_t * wait_set, size_t size)
{
SET_RESIZE(
subscription,
rmw_subscriptions.subscribers,
rmw_subscriptions.subscriber_count)
SET_RESIZE_RMW_DEALLOC(
rmw_subscriptions.subscribers, rmw_subscriptions.subscriber_count),
SET_RESIZE_RMW_REALLOC(
subscription, rmw_subscriptions.subscribers, rmw_subscriptions.subscriber_count)
)
}
rcl_ret_t
@ -244,25 +287,56 @@ rcl_wait_set_add_guard_condition(
rcl_wait_set_t * wait_set,
const rcl_guard_condition_t * guard_condition)
{
SET_ADD(guard_condition, rmw_guard_conditions.guard_conditions)
SET_ADD(guard_condition)
SET_ADD_RMW(guard_condition, rmw_guard_conditions.guard_conditions)
return RCL_RET_OK;
}
rcl_ret_t
rcl_wait_set_clear_guard_conditions(rcl_wait_set_t * wait_set)
{
SET_CLEAR(
SET_CLEAR(guard_condition)
SET_CLEAR_RMW(
guard_condition,
rmw_guard_conditions.guard_conditions,
rmw_guard_conditions.guard_condition_count)
return RCL_RET_OK;
}
rcl_ret_t
rcl_wait_set_resize_guard_conditions(rcl_wait_set_t * wait_set, size_t size)
{
SET_RESIZE(
guard_condition,
SET_RESIZE_RMW_DEALLOC(
rmw_guard_conditions.guard_conditions, rmw_guard_conditions.guard_condition_count),
SET_RESIZE_RMW_REALLOC(
guard_condition,
rmw_guard_conditions.guard_conditions,
rmw_guard_conditions.guard_condition_count)
)
}
rcl_ret_t
rcl_wait_set_add_timer(
rcl_wait_set_t * wait_set,
const rcl_timer_t * timer)
{
SET_ADD(timer)
return RCL_RET_OK;
}
rcl_ret_t
rcl_wait_set_clear_timers(rcl_wait_set_t * wait_set)
{
SET_CLEAR(timer)
return RCL_RET_OK;
}
rcl_ret_t
rcl_wait_set_resize_timers(rcl_wait_set_t * wait_set, size_t size)
{
SET_RESIZE(timer, ;, ;)
}
rcl_ret_t
@ -281,7 +355,24 @@ rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout)
// Create dummy sets for currently unsupported wait-ables.
static rmw_services_t dummy_services = {0, NULL};
static rmw_clients_t dummy_clients = {0, NULL};
rmw_time_t rmw_timeout = rcl_time_from_int64_t_nanoseconds(timeout);
// Calculate the timeout.
int64_t min_timeout = timeout;
if (min_timeout == 0) { // Do not consider timer timeouts if non-blocking.
for (size_t i = 0; i < wait_set->size_of_timers; ++i) {
if (!wait_set->timers[i]) {
continue; // Skip NULL timers.
}
int64_t timer_timeout;
rcl_ret_t ret = rcl_timer_get_time_until_next_call(wait_set->timers[i], &timer_timeout);
if (ret != RCL_RET_OK) {
return ret; // The rcl error state should already be set.
}
if (timer_timeout < min_timeout) {
min_timeout = timer_timeout;
}
}
}
rmw_time_t rmw_timeout = rcl_time_from_int64_t_nanoseconds(min_timeout);
// Wait.
rmw_ret_t ret = rmw_wait(
&wait_set->impl->rmw_subscriptions,
@ -290,6 +381,22 @@ rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout)
&dummy_clients,
&rmw_timeout
);
// Check for error.
if (ret != RMW_RET_OK) {
RCL_SET_ERROR_MSG(rmw_get_error_string_safe());
return RCL_RET_ERROR;
}
// Check for ready timers next, and set not ready timers to NULL.
for (size_t i = 0; i < wait_set->size_of_timers; ++i) {
bool is_ready = false;
rcl_ret_t ret = rcl_timer_is_ready(wait_set->timers[i], &is_ready);
if (ret != RCL_RET_OK) {
return ret; // The rcl error state should already be set.
}
if (!is_ready) {
wait_set->timers[i] = NULL;
}
}
// Check for timeout.
if (ret == RMW_RET_TIMEOUT) {
// Assume none were set (because timeout was reached first), and clear all.
@ -297,11 +404,6 @@ rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout)
rcl_wait_set_clear_guard_conditions(wait_set);
return RCL_RET_TIMEOUT;
}
// Check for error.
if (ret != RMW_RET_OK) {
RCL_SET_ERROR_MSG(rmw_get_error_string_safe());
return RCL_RET_ERROR;
}
// Set corresponding rcl subscription handles NULL.
for (size_t i = 0; i < wait_set->size_of_subscriptions; ++i) {
assert(i < wait_set->impl->rmw_subscriptions.subscriber_count); // Defensive.