From 7ab361a5fb62937e10200bf38c786f6c364bdd7f Mon Sep 17 00:00:00 2001 From: William Woodall Date: Fri, 4 Dec 2015 17:22:48 -0800 Subject: [PATCH] update wait set and rcl_wait for timers --- rcl/include/rcl/timer.h | 3 + rcl/include/rcl/wait.h | 54 +++++++++-- rcl/src/rcl/wait.c | 194 ++++++++++++++++++++++++++++++---------- 3 files changed, 200 insertions(+), 51 deletions(-) diff --git a/rcl/include/rcl/timer.h b/rcl/include/rcl/timer.h index d273ed5..ba62062 100644 --- a/rcl/include/rcl/timer.h +++ b/rcl/include/rcl/timer.h @@ -122,6 +122,9 @@ rcl_timer_init( * This function is not thread-safe with any rcl_timer_* functions used on the * same timer object. * + * This function is not thread-safe. + * This function is lock-free. + * * \param[inout] timer the handle to the timer to be finalized. * \return RCL_RET_OK if the timer was finalized successfully, or * RCL_RET_ERROR an unspecified error occur. diff --git a/rcl/include/rcl/wait.h b/rcl/include/rcl/wait.h index b86d251..eb75555 100644 --- a/rcl/include/rcl/wait.h +++ b/rcl/include/rcl/wait.h @@ -25,6 +25,7 @@ extern "C" #include "rcl/subscription.h" #include "rcl/guard_condition.h" +#include "rcl/timer.h" #include "rcl/types.h" struct rcl_wait_set_impl_t; @@ -37,10 +38,9 @@ typedef struct rcl_wait_set_t { /// Storage for guard condition pointers. const rcl_guard_condition_t ** guard_conditions; size_t size_of_guard_conditions; - /// Allocator for storage. - rcl_allocator_t allocator; - /// If set to true, actions like add_subscription will fail until cleared. - bool pruned; + /// Storage for timer pointers. + const rcl_timer_t ** timers; + size_t size_of_timers; /// Implementation specific storage. struct rcl_wait_set_impl_t * impl; } rcl_wait_set_t; @@ -81,6 +81,7 @@ rcl_get_zero_initialized_wait_set(); * \param[inout] wait_set the wait set struct to be initialized * \param[in] number_of_subscriptions non-zero size of the subscriptions set * \param[in] number_of_guard_conditions non-zero size of the guard conditions set + * \param[in] number_of_timers non-zero size of the timers set * \param[in] allocator the allocator to use when allocating space in the sets * \return RCL_RET_OK if the wait set is initialized successfully, or * RCL_RET_ALREADY_INIT if the wait set is not zero initialized, or @@ -93,6 +94,7 @@ rcl_wait_set_init( rcl_wait_set_t * wait_set, size_t number_of_subscriptions, size_t number_of_guard_conditions, + size_t number_of_timers, rcl_allocator_t allocator); /// Finalize a rcl wait set. @@ -107,6 +109,7 @@ rcl_wait_set_init( * succeed. * * This function is not thread-safe. + * This function is lock-free. * * \param[inout] wait_set the wait set struct to be finalized. * \return RCL_RET_OK if the finalization was successful, or @@ -116,11 +119,28 @@ rcl_wait_set_init( rcl_ret_t rcl_wait_set_fini(rcl_wait_set_t * wait_set); +/// Retrieve the wait set's allocator. +/* The allocator must be an allocated rcl_allocator_t struct, as the result is + * copied into this variable. + * + * This function is not thread-safe. + * This function is lock-free. + * + * \param[in] wait_set the handle to the wait set + * \param[out] allocator the rcl_allocator_t struct to which the result is copied + * \return RCL_RET_OK if the allocator was successfully retrieved, or + * RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or + * RCL_RET_ERROR if an unspecified error occurs. + */ +rcl_ret_t +rcl_wait_set_get_allocator(const rcl_wait_set_t * wait_set, rcl_allocator_t * allocator); + /// Stores a pointer to the given subscription in the next empty spot in the set. /* This function does not guarantee that the subscription is not already in the * wait set. * * This function is not thread-safe. + * This function is lock-free. * * \param[inout] wait_set struct in which the subscription is to be stored * \param[in] subscription the subscription to be added to the wait set @@ -142,6 +162,7 @@ rcl_wait_set_add_subscription( * Calling this on an uninitialized (zero initialized) wait set will fail. * * This function is not thread-safe. + * This function is lock-free. * * \param[inout] wait_set struct to have its subscriptions cleared * \return RCL_RET_OK if cleared successfully, or @@ -196,13 +217,36 @@ rcl_wait_set_add_guard_condition( rcl_ret_t rcl_wait_set_clear_guard_conditions(rcl_wait_set_t * wait_set); -/// Reallocates space for the subscriptions in the wait set. +/// Reallocates space for the guard conditions in the wait set. /* This function behaves exactly the same as for subscriptions. * \see rcl_wait_set_resize_subscriptions */ rcl_ret_t rcl_wait_set_resize_guard_conditions(rcl_wait_set_t * wait_set, size_t size); +/// Stores a pointer to the timer in the next empty spot in the set. +/* This function behaves exactly the same as for subscriptions. + * \see rcl_wait_set_add_subscription + */ +rcl_ret_t +rcl_wait_set_add_timer( + rcl_wait_set_t * wait_set, + const rcl_timer_t * timer); + +/// Removes (sets to NULL) the timers in the wait set. +/* This function behaves exactly the same as for subscriptions. + * \see rcl_wait_set_clear_subscriptions + */ +rcl_ret_t +rcl_wait_set_clear_timers(rcl_wait_set_t * wait_set); + +/// Reallocates space for the timers in the wait set. +/* This function behaves exactly the same as for subscriptions. + * \see rcl_wait_set_resize_subscriptions + */ +rcl_ret_t +rcl_wait_set_resize_timers(rcl_wait_set_t * wait_set, size_t size); + /// Block until the wait set is ready or until the timeout has been exceeded. /* This function will collect the items in the rcl_wait_set_t and pass them * to the underlying rmw_wait function. diff --git a/rcl/src/rcl/wait.c b/rcl/src/rcl/wait.c index c892d73..c13cf38 100644 --- a/rcl/src/rcl/wait.c +++ b/rcl/src/rcl/wait.c @@ -34,6 +34,8 @@ typedef struct rcl_wait_set_impl_t { rmw_subscriptions_t rmw_subscriptions; size_t guard_condition_index; rmw_guard_conditions_t rmw_guard_conditions; + size_t timer_index; + rcl_allocator_t allocator; } rcl_wait_set_impl_t; rcl_wait_set_t @@ -44,7 +46,7 @@ rcl_get_zero_initialized_wait_set() } static bool -__wait_set_is_valid(rcl_wait_set_t * wait_set) +__wait_set_is_valid(const rcl_wait_set_t * wait_set) { return wait_set && wait_set->impl; } @@ -58,6 +60,9 @@ __wait_set_clean_up(rcl_wait_set_t * wait_set, rcl_allocator_t allocator) if (wait_set->guard_conditions) { rcl_wait_set_resize_guard_conditions(wait_set, 0); } + if (wait_set->timers) { + rcl_wait_set_resize_timers(wait_set, 0); + } if (wait_set->impl) { allocator.deallocate(wait_set->impl, allocator.state); wait_set->impl = NULL; @@ -69,6 +74,7 @@ rcl_wait_set_init( rcl_wait_set_t * wait_set, size_t number_of_subscriptions, size_t number_of_guard_conditions, + size_t number_of_timers, rcl_allocator_t allocator) { rcl_ret_t fail_ret = RCL_RET_ERROR; @@ -118,10 +124,18 @@ rcl_wait_set_init( fail_ret = ret; goto fail; } + // Initialize timer space. + ret = rcl_wait_set_resize_timers(wait_set, number_of_timers); + if (ret != RCL_RET_OK) { + fail_ret = ret; + goto fail; + } + if ((ret = rcl_wait_set_clear_timers(wait_set)) != RCL_RET_OK) { + fail_ret = ret; + goto fail; + } // Set allocator. - wait_set->allocator = allocator; - // Initialize pruned. - wait_set->pruned = false; + wait_set->impl->allocator = allocator; return RCL_RET_OK; fail: __wait_set_clean_up(wait_set, allocator); @@ -134,12 +148,25 @@ rcl_wait_set_fini(rcl_wait_set_t * wait_set) rcl_ret_t result = RCL_RET_OK; RCL_CHECK_ARGUMENT_FOR_NULL(wait_set, RCL_RET_INVALID_ARGUMENT); if (__wait_set_is_valid(wait_set)) { - __wait_set_clean_up(wait_set, wait_set->allocator); + __wait_set_clean_up(wait_set, wait_set->impl->allocator); } return result; } -#define SET_ADD(Type, RMWStorage) \ +rcl_ret_t +rcl_wait_set_get_allocator(const rcl_wait_set_t * wait_set, rcl_allocator_t * allocator) +{ + RCL_CHECK_ARGUMENT_FOR_NULL(wait_set, RCL_RET_INVALID_ARGUMENT); + RCL_CHECK_ARGUMENT_FOR_NULL(allocator, RCL_RET_INVALID_ARGUMENT); + if (!__wait_set_is_valid(wait_set)) { + RCL_SET_ERROR_MSG("wait set is invalid"); + return RCL_RET_WAIT_SET_INVALID; + } + *allocator = wait_set->impl->allocator; + return RCL_RET_OK; +} + +#define SET_ADD(Type) \ RCL_CHECK_ARGUMENT_FOR_NULL(wait_set, RCL_RET_INVALID_ARGUMENT); \ RCL_CHECK_ARGUMENT_FOR_NULL(Type, RCL_RET_INVALID_ARGUMENT); \ if (!__wait_set_is_valid(wait_set)) { \ @@ -151,15 +178,16 @@ rcl_wait_set_fini(rcl_wait_set_t * wait_set) return RCL_RET_WAIT_SET_FULL; \ } \ size_t current_index = wait_set->impl->Type##_index++; \ - wait_set->Type##s[current_index] = Type; \ + wait_set->Type##s[current_index] = Type; + +#define SET_ADD_RMW(Type, RMWStorage) \ /* Also place into rmw storage. */ \ rmw_##Type##_t * rmw_handle = rcl_##Type##_get_rmw_##Type##_handle(Type); \ RCL_CHECK_FOR_NULL_WITH_MSG( \ rmw_handle, rcl_get_error_string_safe(), return RCL_RET_ERROR); \ - wait_set->impl->RMWStorage[current_index] = rmw_handle->data; \ - return RCL_RET_OK; + wait_set->impl->RMWStorage[current_index] = rmw_handle->data; -#define SET_CLEAR(Type, RMWStorage, RMWCount) \ +#define SET_CLEAR(Type) \ RCL_CHECK_ARGUMENT_FOR_NULL(wait_set, RCL_RET_INVALID_ARGUMENT); \ if (!__wait_set_is_valid(wait_set)) { \ RCL_SET_ERROR_MSG("wait set is invalid"); \ @@ -167,67 +195,79 @@ rcl_wait_set_fini(rcl_wait_set_t * wait_set) } \ memset(wait_set->Type##s, 0, sizeof(rcl_##Type##_t *) * wait_set->size_of_##Type##s); \ wait_set->impl->Type##_index = 0; \ + +#define SET_CLEAR_RMW(Type, RMWStorage, RMWCount) \ /* Also clear the rmw storage. */ \ memset( \ wait_set->impl->RMWStorage, \ 0, \ - sizeof(rmw_subscription_t *) * wait_set->impl->RMWCount); \ - return RCL_RET_OK; + sizeof(rmw_subscription_t *) * wait_set->impl->RMWCount); -#define SET_RESIZE(Type, RMWStorage, RMWCount) \ +#define SET_RESIZE(Type, ExtraDealloc, ExtraRealloc) \ RCL_CHECK_ARGUMENT_FOR_NULL(wait_set, RCL_RET_INVALID_ARGUMENT); \ RCL_CHECK_FOR_NULL_WITH_MSG( \ wait_set->impl, "wait set is invalid", return RCL_RET_WAIT_SET_INVALID); \ if (size == wait_set->size_of_##Type##s) { \ return RCL_RET_OK; \ } \ + rcl_allocator_t allocator = wait_set->impl->allocator; \ if (size == 0) { \ if (wait_set->Type##s) { \ - wait_set->allocator.deallocate(wait_set->Type##s, wait_set->allocator.state); \ + allocator.deallocate(wait_set->Type##s, allocator.state); \ wait_set->Type##s = NULL; \ } \ - /* Also deallocate the rmw storage. */ \ - if (wait_set->impl->RMWStorage) { \ - wait_set->allocator.deallocate(wait_set->impl->RMWStorage, wait_set->allocator.state); \ - wait_set->impl->RMWStorage = NULL; \ - } \ + ExtraDealloc \ } \ else { \ wait_set->size_of_##Type##s = 0; \ - wait_set->Type##s = (const rcl_##Type##_t **)wait_set->allocator.reallocate( \ - wait_set->Type##s, sizeof(rcl_##Type##_t *) * size, wait_set->allocator.state); \ + wait_set->Type##s = (const rcl_##Type##_t **)allocator.reallocate( \ + wait_set->Type##s, sizeof(rcl_##Type##_t *) * size, allocator.state); \ RCL_CHECK_FOR_NULL_WITH_MSG( \ wait_set->Type##s, "allocating memory failed", return RCL_RET_BAD_ALLOC); \ wait_set->size_of_##Type##s = size; \ - /* Also resize the rmw storage. */ \ - wait_set->impl->RMWCount = 0; \ - wait_set->impl->RMWStorage = (void **)wait_set->allocator.reallocate( \ - wait_set->impl->RMWStorage, sizeof(rcl_##Type##_t *) * size, wait_set->allocator.state); \ - if (!wait_set->impl->RMWStorage) { \ - wait_set->allocator.deallocate(wait_set->Type##s, wait_set->allocator.state); \ - wait_set->size_of_##Type##s = 0; \ - RCL_SET_ERROR_MSG("allocating memory failed"); \ - return RCL_RET_BAD_ALLOC; \ - } \ - wait_set->impl->RMWCount = size; \ + ExtraRealloc \ } \ return RCL_RET_OK; +#define SET_RESIZE_RMW_DEALLOC(RMWStorage, RMWCount) \ + /* Also deallocate the rmw storage. */ \ + if (wait_set->impl->RMWStorage) { \ + allocator.deallocate(wait_set->impl->RMWStorage, allocator.state); \ + wait_set->impl->RMWStorage = NULL; \ + } + +#define SET_RESIZE_RMW_REALLOC(Type, RMWStorage, RMWCount) \ + /* Also resize the rmw storage. */ \ + wait_set->impl->RMWCount = 0; \ + wait_set->impl->RMWStorage = (void **)allocator.reallocate( \ + wait_set->impl->RMWStorage, sizeof(rcl_##Type##_t *) * size, allocator.state); \ + if (!wait_set->impl->RMWStorage) { \ + allocator.deallocate(wait_set->Type##s, allocator.state); \ + wait_set->size_of_##Type##s = 0; \ + RCL_SET_ERROR_MSG("allocating memory failed"); \ + return RCL_RET_BAD_ALLOC; \ + } \ + wait_set->impl->RMWCount = size; + rcl_ret_t rcl_wait_set_add_subscription( rcl_wait_set_t * wait_set, const rcl_subscription_t * subscription) { - SET_ADD(subscription, rmw_subscriptions.subscribers) + SET_ADD(subscription) + SET_ADD_RMW(subscription, rmw_subscriptions.subscribers) + return RCL_RET_OK; } rcl_ret_t rcl_wait_set_clear_subscriptions(rcl_wait_set_t * wait_set) { - SET_CLEAR( + SET_CLEAR(subscription) + SET_CLEAR_RMW( subscription, rmw_subscriptions.subscribers, rmw_subscriptions.subscriber_count) + return RCL_RET_OK; } rcl_ret_t @@ -235,8 +275,11 @@ rcl_wait_set_resize_subscriptions(rcl_wait_set_t * wait_set, size_t size) { SET_RESIZE( subscription, - rmw_subscriptions.subscribers, - rmw_subscriptions.subscriber_count) + SET_RESIZE_RMW_DEALLOC( + rmw_subscriptions.subscribers, rmw_subscriptions.subscriber_count), + SET_RESIZE_RMW_REALLOC( + subscription, rmw_subscriptions.subscribers, rmw_subscriptions.subscriber_count) + ) } rcl_ret_t @@ -244,16 +287,20 @@ rcl_wait_set_add_guard_condition( rcl_wait_set_t * wait_set, const rcl_guard_condition_t * guard_condition) { - SET_ADD(guard_condition, rmw_guard_conditions.guard_conditions) + SET_ADD(guard_condition) + SET_ADD_RMW(guard_condition, rmw_guard_conditions.guard_conditions) + return RCL_RET_OK; } rcl_ret_t rcl_wait_set_clear_guard_conditions(rcl_wait_set_t * wait_set) { - SET_CLEAR( + SET_CLEAR(guard_condition) + SET_CLEAR_RMW( guard_condition, rmw_guard_conditions.guard_conditions, rmw_guard_conditions.guard_condition_count) + return RCL_RET_OK; } rcl_ret_t @@ -261,8 +308,35 @@ rcl_wait_set_resize_guard_conditions(rcl_wait_set_t * wait_set, size_t size) { SET_RESIZE( guard_condition, - rmw_guard_conditions.guard_conditions, - rmw_guard_conditions.guard_condition_count) + SET_RESIZE_RMW_DEALLOC( + rmw_guard_conditions.guard_conditions, rmw_guard_conditions.guard_condition_count), + SET_RESIZE_RMW_REALLOC( + guard_condition, + rmw_guard_conditions.guard_conditions, + rmw_guard_conditions.guard_condition_count) + ) +} + +rcl_ret_t +rcl_wait_set_add_timer( + rcl_wait_set_t * wait_set, + const rcl_timer_t * timer) +{ + SET_ADD(timer) + return RCL_RET_OK; +} + +rcl_ret_t +rcl_wait_set_clear_timers(rcl_wait_set_t * wait_set) +{ + SET_CLEAR(timer) + return RCL_RET_OK; +} + +rcl_ret_t +rcl_wait_set_resize_timers(rcl_wait_set_t * wait_set, size_t size) +{ + SET_RESIZE(timer, ;, ;) } rcl_ret_t @@ -281,7 +355,24 @@ rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout) // Create dummy sets for currently unsupported wait-ables. static rmw_services_t dummy_services = {0, NULL}; static rmw_clients_t dummy_clients = {0, NULL}; - rmw_time_t rmw_timeout = rcl_time_from_int64_t_nanoseconds(timeout); + // Calculate the timeout. + int64_t min_timeout = timeout; + if (min_timeout == 0) { // Do not consider timer timeouts if non-blocking. + for (size_t i = 0; i < wait_set->size_of_timers; ++i) { + if (!wait_set->timers[i]) { + continue; // Skip NULL timers. + } + int64_t timer_timeout; + rcl_ret_t ret = rcl_timer_get_time_until_next_call(wait_set->timers[i], &timer_timeout); + if (ret != RCL_RET_OK) { + return ret; // The rcl error state should already be set. + } + if (timer_timeout < min_timeout) { + min_timeout = timer_timeout; + } + } + } + rmw_time_t rmw_timeout = rcl_time_from_int64_t_nanoseconds(min_timeout); // Wait. rmw_ret_t ret = rmw_wait( &wait_set->impl->rmw_subscriptions, @@ -290,6 +381,22 @@ rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout) &dummy_clients, &rmw_timeout ); + // Check for error. + if (ret != RMW_RET_OK) { + RCL_SET_ERROR_MSG(rmw_get_error_string_safe()); + return RCL_RET_ERROR; + } + // Check for ready timers next, and set not ready timers to NULL. + for (size_t i = 0; i < wait_set->size_of_timers; ++i) { + bool is_ready = false; + rcl_ret_t ret = rcl_timer_is_ready(wait_set->timers[i], &is_ready); + if (ret != RCL_RET_OK) { + return ret; // The rcl error state should already be set. + } + if (!is_ready) { + wait_set->timers[i] = NULL; + } + } // Check for timeout. if (ret == RMW_RET_TIMEOUT) { // Assume none were set (because timeout was reached first), and clear all. @@ -297,11 +404,6 @@ rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout) rcl_wait_set_clear_guard_conditions(wait_set); return RCL_RET_TIMEOUT; } - // Check for error. - if (ret != RMW_RET_OK) { - RCL_SET_ERROR_MSG(rmw_get_error_string_safe()); - return RCL_RET_ERROR; - } // Set corresponding rcl subscription handles NULL. for (size_t i = 0; i < wait_set->size_of_subscriptions; ++i) { assert(i < wait_set->impl->rmw_subscriptions.subscriber_count); // Defensive.