Merge pull request #97 from ros2/document_user_fns

Document user-facing code in rclcpp
This commit is contained in:
gerkey 2015-08-28 13:54:57 -07:00
commit 91a7a69b23
12 changed files with 366 additions and 18 deletions

View file

@ -36,6 +36,16 @@ namespace rclcpp
namespace executor namespace executor
{ {
/// Coordinate the order and timing of available communication tasks.
/**
* Executor provides spin functions (including spin_node_once and spin_some).
* It coordinates the nodes and callback groups by looking for available work and completing it,
* based on the threading or concurrency scheme provided by the subclass implementation.
* An example of available work is executing a subscription callback, or a timer callback.
* The executor structure allows for a decoupling of the communication graph and the execution
* model.
* See SingleThreadedExecutor and MultiThreadedExecutor for examples of execution paradigms.
*/
class Executor class Executor
{ {
friend class memory_strategy::MemoryStrategy; friend class memory_strategy::MemoryStrategy;
@ -43,6 +53,8 @@ class Executor
public: public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Executor); RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Executor);
/// Default constructor.
// \param[in] ms The memory strategy to be used with this executor.
explicit Executor(memory_strategy::MemoryStrategy::SharedPtr ms = explicit Executor(memory_strategy::MemoryStrategy::SharedPtr ms =
memory_strategy::create_default_strategy()) memory_strategy::create_default_strategy())
: interrupt_guard_condition_(rmw_create_guard_condition()), : interrupt_guard_condition_(rmw_create_guard_condition()),
@ -50,8 +62,10 @@ public:
{ {
} }
/// Default destructor.
virtual ~Executor() virtual ~Executor()
{ {
// Try to deallocate the interrupt guard condition.
if (interrupt_guard_condition_ != nullptr) { if (interrupt_guard_condition_ != nullptr) {
rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_); rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_);
if (status != RMW_RET_OK) { if (status != RMW_RET_OK) {
@ -61,8 +75,18 @@ public:
} }
} }
/// Do work periodically as it becomes available to us. Blocking call, may block indefinitely.
// It is up to the implementation of Executor to implement spin.
virtual void spin() = 0; virtual void spin() = 0;
/// Add a node to the executor.
/**
* An executor can have zero or more nodes which provide work during `spin` functions.
* \param[in] node_ptr Shared pointer to the node to be added.
* \param[in] notify True to trigger the interrupt guard condition during this function. If
* the executor is blocked at the rmw layer while waiting for work and it is notified that a new
* node was added, it will wake up.
*/
virtual void virtual void
add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify = true) add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify = true)
{ {
@ -84,6 +108,13 @@ public:
} }
} }
/// Remove a node from the executor.
/**
* \param[in] node_ptr Shared pointer to the node to remove.
* \param[in] notify True to trigger the interrupt guard condition and wake up the executor.
* This is useful if the last node was removed from the executor while the executor was blocked
* waiting for work in another thread, because otherwise the executor would never be notified.
*/
virtual void virtual void
remove_node(rclcpp::node::Node::SharedPtr & node_ptr, bool notify = true) remove_node(rclcpp::node::Node::SharedPtr & node_ptr, bool notify = true)
{ {
@ -108,6 +139,13 @@ public:
} }
} }
/// Add a node to executor, execute the next available unit of work, and remove the node.
/**
* \param[in] node Shared pointer to the node to add.
* \param[in] timeout How long to wait for work to become available. Negative values cause
* spin_node_once to block indefinitely (the default behavior). A timeout of 0 causes this
* function to be non-blocking.
*/
template<typename T = std::milli> template<typename T = std::milli>
void spin_node_once(rclcpp::node::Node::SharedPtr & node, void spin_node_once(rclcpp::node::Node::SharedPtr & node,
std::chrono::duration<int64_t, T> timeout = std::chrono::duration<int64_t, T>(-1)) std::chrono::duration<int64_t, T> timeout = std::chrono::duration<int64_t, T>(-1))
@ -121,6 +159,10 @@ public:
this->remove_node(node, false); this->remove_node(node, false);
} }
/// Add a node, complete all immediately available work, and remove the node.
/**
* \param[in] node Shared pointer to the node to add.
*/
void spin_node_some(rclcpp::node::Node::SharedPtr & node) void spin_node_some(rclcpp::node::Node::SharedPtr & node)
{ {
this->add_node(node, false); this->add_node(node, false);
@ -128,6 +170,13 @@ public:
this->remove_node(node, false); this->remove_node(node, false);
} }
/// Complete all available queued work without blocking.
/**
* This function can be overridden. The default implementation is suitable for a
* single-threaded model of execution.
* Adding subscriptions, timers, services, etc. with blocking callbacks will cause this function
* to block (which may have unintended consequences).
*/
virtual void spin_some() virtual void spin_some()
{ {
while (AnyExecutable::SharedPtr any_exec = while (AnyExecutable::SharedPtr any_exec =
@ -137,7 +186,12 @@ public:
} }
} }
// Support dynamic switching of memory strategy /// Support dynamic switching of the memory strategy.
/**
* Switching the memory strategy while the executor is spinning in another threading could have
* unintended consequences.
* \param[in] memory_strategy Shared pointer to the memory strategy to set.
*/
void void
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy) set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
{ {
@ -148,6 +202,10 @@ public:
} }
protected: protected:
/// Find the next available executable and do the work associated with it.
/** \param[in] any_exec Union structure that can hold any executable type (timer, subscription,
* service, client).
*/
void void
execute_any_executable(AnyExecutable::SharedPtr & any_exec) execute_any_executable(AnyExecutable::SharedPtr & any_exec)
{ {
@ -908,8 +966,10 @@ protected:
return any_exec; return any_exec;
} }
/// Guard condition for signaling the rmw layer to wake up for special events.
rmw_guard_condition_t * interrupt_guard_condition_; rmw_guard_condition_t * interrupt_guard_condition_;
/// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
private: private:

View file

@ -36,17 +36,24 @@ namespace executors
namespace single_threaded_executor namespace single_threaded_executor
{ {
/// Single-threaded executor implementation
// This is the default executor created by rclcpp::spin.
class SingleThreadedExecutor : public executor::Executor class SingleThreadedExecutor : public executor::Executor
{ {
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(SingleThreadedExecutor); RCLCPP_SMART_PTR_DEFINITIONS(SingleThreadedExecutor);
/// Default constructor. See the default constructor for Executor.
SingleThreadedExecutor(memory_strategy::MemoryStrategy::SharedPtr ms = SingleThreadedExecutor(memory_strategy::MemoryStrategy::SharedPtr ms =
memory_strategy::create_default_strategy()) memory_strategy::create_default_strategy())
: executor::Executor(ms) {} : executor::Executor(ms) {}
/// Default destrcutor.
virtual ~SingleThreadedExecutor() {} virtual ~SingleThreadedExecutor() {}
/// Single-threaded implementation of spin.
// This function will block until work comes in, execute it, and keep blocking.
// It will only be interrupt by a CTRL-C (managed by the global signal handler).
void spin() void spin()
{ {
while (rclcpp::utilities::ok()) { while (rclcpp::utilities::ok()) {

View file

@ -30,6 +30,12 @@ class Executor;
namespace memory_strategy namespace memory_strategy
{ {
/// Delegate for handling memory allocations while the Executor is executing.
/**
* By default, the memory strategy dynamically allocates memory for structures that come in from
* the rmw implementation after the executor waits for work, based on the number of entities that
* come through.
*/
class MemoryStrategy class MemoryStrategy
{ {
@ -37,23 +43,44 @@ class MemoryStrategy
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(MemoryStrategy); RCLCPP_SMART_PTR_DEFINITIONS(MemoryStrategy);
/// Borrow memory for storing data for subscriptions, services, clients, or guard conditions.
/**
* The default implementation ignores the handle type and dynamically allocates the memory.
* \param[in] The type of entity that this function is requesting for.
* \param[in] The number of handles to borrow.
* \return Pointer to the allocated handles.
*/
virtual void ** borrow_handles(HandleType type, size_t number_of_handles) virtual void ** borrow_handles(HandleType type, size_t number_of_handles)
{ {
(void)type; (void)type;
return static_cast<void **>(alloc(sizeof(void *) * number_of_handles)); return static_cast<void **>(alloc(sizeof(void *) * number_of_handles));
} }
/// Return the memory borrowed in borrow_handles.
/**
* return_handles should always mirror the way memory was borrowed in borrow_handles.
* \param[in] The type of entity that this function is returning.
* \param[in] Pointer to the handles returned.
*/
virtual void return_handles(HandleType type, void ** handles) virtual void return_handles(HandleType type, void ** handles)
{ {
(void)type; (void)type;
this->free(handles); this->free(handles);
} }
/// Provide a newly initialized AnyExecutable object.
// \return Shared pointer to the fresh executable.
virtual executor::AnyExecutable::SharedPtr instantiate_next_executable() virtual executor::AnyExecutable::SharedPtr instantiate_next_executable()
{ {
return std::make_shared<executor::AnyExecutable>(); return std::make_shared<executor::AnyExecutable>();
} }
/// Implementation of a general-purpose allocation function.
/**
* \param[in] size Number of bytes to allocate.
* \return Pointer to the allocated chunk of memory.
*/
virtual void * alloc(size_t size) virtual void * alloc(size_t size)
{ {
if (size == 0) { if (size == 0) {
@ -62,6 +89,10 @@ public:
return std::malloc(size); return std::malloc(size);
} }
/// Implementation of a general-purpose free.
/**
* \param[in] Pointer to deallocate.
*/
virtual void free(void * ptr) virtual void free(void * ptr)
{ {
return std::free(ptr); return std::free(ptr);

View file

@ -24,6 +24,8 @@ namespace rclcpp
namespace message_memory_strategy namespace message_memory_strategy
{ {
/// Default allocation strategy for messages received by subscriptions.
// A message memory strategy must be templated on the type of the subscription it belongs to.
template<typename MessageT> template<typename MessageT>
class MessageMemoryStrategy class MessageMemoryStrategy
{ {
@ -31,16 +33,21 @@ class MessageMemoryStrategy
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(MessageMemoryStrategy); RCLCPP_SMART_PTR_DEFINITIONS(MessageMemoryStrategy);
/// Default factory method
static SharedPtr create_default() static SharedPtr create_default()
{ {
return SharedPtr(new MessageMemoryStrategy<MessageT>); return SharedPtr(new MessageMemoryStrategy<MessageT>);
} }
/// By default, dynamically allocate a new message.
// \return Shared pointer to the new message.
virtual std::shared_ptr<MessageT> borrow_message() virtual std::shared_ptr<MessageT> borrow_message()
{ {
return std::shared_ptr<MessageT>(new MessageT); return std::shared_ptr<MessageT>(new MessageT);
} }
/// Release ownership of the message, which will deallocate it if it has no more owners.
// \param[in] Shared pointer to the message we are returning.
virtual void return_message(std::shared_ptr<MessageT> & msg) virtual void return_message(std::shared_ptr<MessageT> & msg)
{ {
msg.reset(); msg.reset();

View file

@ -56,11 +56,7 @@ class Executor;
namespace node namespace node
{ {
/// Node is the single point of entry for creating publishers and subscribers.
/* ROS Node Interface.
*
* This is the single point of entry for creating publishers and subscribers.
*/
class Node class Node
{ {
friend class rclcpp::executor::Executor; friend class rclcpp::executor::Executor;
@ -68,29 +64,55 @@ class Node
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(Node); RCLCPP_SMART_PTR_DEFINITIONS(Node);
/* Create a node based on the node name. */ /// Create a new node with the specified name.
/**
* \param[in] node_name Name of the node.
* \param[in] use_intra_process_comms True to use the optimized intra-process communication
* pipeline to pass messages between nodes in the same process using shared memory.
*/
Node(const std::string & node_name, bool use_intra_process_comms = false); Node(const std::string & node_name, bool use_intra_process_comms = false);
/* Create a node based on the node name and a rclcpp::context::Context. */
/// Create a node based on the node name and a rclcpp::context::Context.
/**
* \param[in] node_name Name of the node.
* \param[in] context The context for the node (usually represents the state of a process).
* \param[in] use_intra_process_comms True to use the optimized intra-process communication
* pipeline to pass messages between nodes in the same process using shared memory.
*/
Node( Node(
const std::string & node_name, rclcpp::context::Context::SharedPtr context, const std::string & node_name, rclcpp::context::Context::SharedPtr context,
bool use_intra_process_comms = false); bool use_intra_process_comms = false);
/* Get the name of the node. */ /// Get the name of the node.
// \return The name of the node.
const std::string & const std::string &
get_name() const {return name_; } get_name() const {return name_; }
/* Create and return a callback group. */ /// Create and return a callback group.
rclcpp::callback_group::CallbackGroup::SharedPtr rclcpp::callback_group::CallbackGroup::SharedPtr
create_callback_group(rclcpp::callback_group::CallbackGroupType group_type); create_callback_group(rclcpp::callback_group::CallbackGroupType group_type);
/* Create and return a Publisher. */ /// Create and return a Publisher.
/**
* \param[in] topic_name The topic for this publisher to publish on.
* \param[in] qos_profile The quality of service profile to pass on to the rmw implementation.
* \return Shared pointer to the created publisher.
*/
template<typename MessageT> template<typename MessageT>
rclcpp::publisher::Publisher::SharedPtr rclcpp::publisher::Publisher::SharedPtr
create_publisher( create_publisher(
const std::string & topic_name, const rmw_qos_profile_t & qos_profile); const std::string & topic_name, const rmw_qos_profile_t & qos_profile);
/* Create and return a Subscription. */ /// Create and return a Subscription.
/**
* \param[in] topic_name The topic to subscribe on.
* \param[in] qos_profile The quality of service profile to pass on to the rmw implementation.
* \param[in] callback The user-defined callback function.
* \param[in] group The callback group for this subscription. NULL for no callback group.
* \param[in] ignore_local_publications True to ignore local publications.
* \param[in] msg_mem_strat The message memory strategy to use for allocating messages.
* \return Shared pointer to the created subscription.
*/
/* TODO(jacquelinekay): /* TODO(jacquelinekay):
Windows build breaks when static member function passed as default Windows build breaks when static member function passed as default
argument to msg_mem_strat, nullptr is a workaround. argument to msg_mem_strat, nullptr is a workaround.
@ -117,13 +139,24 @@ public:
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
msg_mem_strat = nullptr); msg_mem_strat = nullptr);
/* Create a timer. */ /// Create a timer.
/**
* \param[in] period Time interval between triggers of the callback.
* \param[in] callback User-defined callback function.
* \param[in] group Callback group to execute this timer's callback in.
*/
rclcpp::timer::WallTimer::SharedPtr rclcpp::timer::WallTimer::SharedPtr
create_wall_timer( create_wall_timer(
std::chrono::nanoseconds period, std::chrono::nanoseconds period,
rclcpp::timer::CallbackType callback, rclcpp::timer::CallbackType callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr); rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr);
/// Create a timer with a sub-nanosecond precision update period.
/**
* \param[in] period Time interval between triggers of the callback.
* \param[in] callback User-defined callback function.
* \param[in] group Callback group to execute this timer's callback in.
*/
// TODO(wjwwood): reenable this once I figure out why the demo doesn't build with it. // TODO(wjwwood): reenable this once I figure out why the demo doesn't build with it.
// rclcpp::timer::WallTimer::SharedPtr // rclcpp::timer::WallTimer::SharedPtr
// create_wall_timer( // create_wall_timer(

View file

@ -40,6 +40,7 @@ class Node;
namespace publisher namespace publisher
{ {
/// A publisher publishes messages of any type to a topic.
class Publisher class Publisher
{ {
friend rclcpp::node::Node; friend rclcpp::node::Node;
@ -47,6 +48,15 @@ class Publisher
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(Publisher); RCLCPP_SMART_PTR_DEFINITIONS(Publisher);
/// Default constructor.
/**
* Typically, a publisher is not created through this method, but instead is created through a
* call to `Node::create_publisher`.
* \param[in] node_handle The corresponding rmw representation of the owner node.
* \param[in] publisher_handle The rmw publisher handle corresponding to this publisher.
* \param[in] topic The topic that this publisher publishes on.
* \param[in] queue_size The maximum number of unpublished messages to queue.
*/
Publisher( Publisher(
std::shared_ptr<rmw_node_t> node_handle, std::shared_ptr<rmw_node_t> node_handle,
rmw_publisher_t * publisher_handle, rmw_publisher_t * publisher_handle,
@ -66,6 +76,7 @@ public:
} }
} }
/// Default destructor.
virtual ~Publisher() virtual ~Publisher()
{ {
if (intra_process_publisher_handle_) { if (intra_process_publisher_handle_) {
@ -86,6 +97,11 @@ public:
} }
} }
/// Send a message to the topic for this publisher.
/**
* This function is templated on the input message type, MessageT.
* \param[in] msg A shared pointer to the message to send.
*/
template<typename MessageT> template<typename MessageT>
void void
publish(std::unique_ptr<MessageT> & msg) publish(std::unique_ptr<MessageT> & msg)
@ -155,36 +171,56 @@ public:
return this->publish(unique_msg); return this->publish(unique_msg);
} }
/// Get the topic that this publisher publishes on.
// \return The topic name.
const std::string & const std::string &
get_topic_name() const get_topic_name() const
{ {
return topic_; return topic_;
} }
/// Get the queue size for this publisher.
// \return The queue size.
size_t size_t
get_queue_size() const get_queue_size() const
{ {
return queue_size_; return queue_size_;
} }
/// Get the global identifier for this publisher (used in rmw and by DDS).
// \return The gid.
const rmw_gid_t & const rmw_gid_t &
get_gid() const get_gid() const
{ {
return rmw_gid_; return rmw_gid_;
} }
/// Get the global identifier for this publisher used by intra-process communication.
// \return The intra-process gid.
const rmw_gid_t & const rmw_gid_t &
get_intra_process_gid() const get_intra_process_gid() const
{ {
return intra_process_rmw_gid_; return intra_process_rmw_gid_;
} }
/// Compare this publisher to a gid.
/**
* Note that this function calls the next function.
* \param[in] gid Reference to a gid.
* \return True if the publisher's gid matches the input.
*/
bool bool
operator==(const rmw_gid_t & gid) const operator==(const rmw_gid_t & gid) const
{ {
return *this == &gid; return *this == &gid;
} }
/// Compare this publisher to a pointer gid.
/**
* A wrapper for comparing this publisher's gid to the input using rmw_compare_gids_equal.
* \param[in] gid A pointer to a gid.
* \return True if this publisher's gid matches the input.
*/
bool bool
operator==(const rmw_gid_t * gid) const operator==(const rmw_gid_t * gid) const
{ {

View file

@ -63,6 +63,8 @@ operator"" _ns(long double ns)
std::chrono::duration<long double, std::nano>(ns)); std::chrono::duration<long double, std::nano>(ns));
} }
// Namespace escalations.
// For example, this next line escalates type "rclcpp:node::Node" to "rclcpp::Node"
using rclcpp::node::Node; using rclcpp::node::Node;
using rclcpp::publisher::Publisher; using rclcpp::publisher::Publisher;
using rclcpp::subscription::SubscriptionBase; using rclcpp::subscription::SubscriptionBase;
@ -78,12 +80,16 @@ using rclcpp::utilities::shutdown;
using rclcpp::utilities::init; using rclcpp::utilities::init;
using rclcpp::utilities::sleep_for; using rclcpp::utilities::sleep_for;
/// Create a default single-threaded executor and execute any immediately available work.
// \param[in] node_ptr Shared pointer to the node to spin.
void spin_some(Node::SharedPtr & node_ptr) void spin_some(Node::SharedPtr & node_ptr)
{ {
rclcpp::executors::SingleThreadedExecutor executor; rclcpp::executors::SingleThreadedExecutor executor;
executor.spin_node_some(node_ptr); executor.spin_node_some(node_ptr);
} }
/// Create a default single-threaded executor and spin the specified node.
// \param[in] node_ptr Shared pointer to the node to spin.
void spin(Node::SharedPtr & node_ptr) void spin(Node::SharedPtr & node_ptr)
{ {
rclcpp::executors::SingleThreadedExecutor executor; rclcpp::executors::SingleThreadedExecutor executor;

View file

@ -25,6 +25,13 @@ namespace strategies
namespace message_pool_memory_strategy namespace message_pool_memory_strategy
{ {
/// Completely static memory allocation strategy for messages.
/**
* Templated on the type of message pooled by this class and the size of the message pool.
* Templating allows the program to determine the memory required for this object at compile time.
* The size of the message pool should be at least the largest number of concurrent accesses to
* the subscription (usually the number of threads).
*/
template<typename MessageT, size_t Size, template<typename MessageT, size_t Size,
typename std::enable_if<rosidl_generator_traits::has_fixed_size<MessageT>::value>::type * = typename std::enable_if<rosidl_generator_traits::has_fixed_size<MessageT>::value>::type * =
nullptr> nullptr>
@ -33,6 +40,8 @@ class MessagePoolMemoryStrategy
{ {
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(MessagePoolMemoryStrategy); RCLCPP_SMART_PTR_DEFINITIONS(MessagePoolMemoryStrategy);
/// Default constructor
MessagePoolMemoryStrategy() MessagePoolMemoryStrategy()
: next_array_index_(0) : next_array_index_(0)
{ {
@ -42,6 +51,12 @@ public:
} }
} }
/// Borrow a message from the message pool.
/**
* Manage the message pool ring buffer.
* Throw an exception if the next message was not available.
* \return Shared pointer to the borrowed message.
*/
std::shared_ptr<MessageT> borrow_message() std::shared_ptr<MessageT> borrow_message()
{ {
size_t current_index = next_array_index_; size_t current_index = next_array_index_;
@ -56,6 +71,11 @@ public:
return pool_[current_index].msg_ptr_; return pool_[current_index].msg_ptr_;
} }
/// Return a message to the message pool.
/**
* Manage metadata in the message pool ring buffer to release the message.
* \param[in] msg Shared pointer to the message to return.
*/
void return_message(std::shared_ptr<MessageT> & msg) void return_message(std::shared_ptr<MessageT> & msg)
{ {
for (size_t i = 0; i < Size; ++i) { for (size_t i = 0; i < Size; ++i) {

View file

@ -27,6 +27,8 @@ namespace memory_strategies
namespace static_memory_strategy namespace static_memory_strategy
{ {
/// Representation of the upper bounds on the memory pools managed by StaticMemoryStrategy.
struct ObjectPoolBounds struct ObjectPoolBounds
{ {
public: public:
@ -37,41 +39,74 @@ public:
size_t max_guard_conditions; size_t max_guard_conditions;
size_t pool_size; size_t pool_size;
/// Default constructor attempts to set reasonable default limits on the fields.
ObjectPoolBounds() ObjectPoolBounds()
: max_subscriptions(10), max_services(10), max_clients(10), : max_subscriptions(10), max_services(10), max_clients(10),
max_executables(1), max_guard_conditions(2), pool_size(1024) max_executables(1), max_guard_conditions(2), pool_size(1024)
{} {}
// Setters implement named parameter idiom/method chaining
/// Set the maximum number of subscriptions.
/**
* \param[in] subscriptions Maximum number of subscriptions.
* \return Reference to this object, for method chaining.
*/
ObjectPoolBounds & set_max_subscriptions(size_t subscriptions) ObjectPoolBounds & set_max_subscriptions(size_t subscriptions)
{ {
max_subscriptions = subscriptions; max_subscriptions = subscriptions;
return *this; return *this;
} }
/// Set the maximum number of services.
/**
* \param[in] services Maximum number of services.
* \return Reference to this object, for method chaining.
*/
ObjectPoolBounds & set_max_services(size_t services) ObjectPoolBounds & set_max_services(size_t services)
{ {
max_services = services; max_services = services;
return *this; return *this;
} }
/// Set the maximum number of clients.
/**
* \param[in] clients Maximum number of clients.
* \return Reference to this object, for method chaining.
*/
ObjectPoolBounds & set_max_clients(size_t clients) ObjectPoolBounds & set_max_clients(size_t clients)
{ {
max_clients = clients; max_clients = clients;
return *this; return *this;
} }
/// Set the maximum number of guard conditions.
/**
* \param[in] guard conditions Maximum number of guard conditions.
* \return Reference to this object, for method chaining.
*/
ObjectPoolBounds & set_max_guard_conditions(size_t guard_conditions) ObjectPoolBounds & set_max_guard_conditions(size_t guard_conditions)
{ {
max_guard_conditions = guard_conditions; max_guard_conditions = guard_conditions;
return *this; return *this;
} }
/// Set the maximum number of executables.
/**
* \param[in] executables Maximum number of executables.
* \return Reference to this object, for method chaining.
*/
ObjectPoolBounds & set_max_executables(size_t executables) ObjectPoolBounds & set_max_executables(size_t executables)
{ {
max_executables = executables; max_executables = executables;
return *this; return *this;
} }
/// Set the maximum memory pool size.
/**
* \param[in] executables Maximum memory pool size.
* \return Reference to this object, for method chaining.
*/
ObjectPoolBounds & set_memory_pool_size(size_t pool) ObjectPoolBounds & set_memory_pool_size(size_t pool)
{ {
pool_size = pool; pool_size = pool;
@ -80,9 +115,20 @@ public:
}; };
/// Static memory allocation alternative to the default memory strategy.
/**
* The name is a bit of a misnomer. The memory managed by this class is actually allocated
* dynamically in the constructor, but no subsequent accesses to the class (besides the destructor)
* allocate or free memory.
* StaticMemoryStrategy puts a hard limit on the number of subscriptions, etc. that can be executed
* in one iteration of `Executor::spin`. Thus it allows for memory allocation optimization for
* situations where a limit on the number of such entities is known.
*/
class StaticMemoryStrategy : public memory_strategy::MemoryStrategy class StaticMemoryStrategy : public memory_strategy::MemoryStrategy
{ {
public: public:
/// Default constructor.
// \param[in] bounds Representation of the limits on memory managed by this class.
StaticMemoryStrategy(ObjectPoolBounds bounds = ObjectPoolBounds()) StaticMemoryStrategy(ObjectPoolBounds bounds = ObjectPoolBounds())
: bounds_(bounds), memory_pool_(nullptr), subscription_pool_(nullptr), : bounds_(bounds), memory_pool_(nullptr), subscription_pool_(nullptr),
service_pool_(nullptr), guard_condition_pool_(nullptr), executable_pool_(nullptr) service_pool_(nullptr), guard_condition_pool_(nullptr), executable_pool_(nullptr)
@ -130,6 +176,7 @@ public:
} }
} }
/// Default destructor. Free all allocated memory.
~StaticMemoryStrategy() ~StaticMemoryStrategy()
{ {
if (bounds_.pool_size) { if (bounds_.pool_size) {
@ -149,6 +196,12 @@ public:
} }
} }
/// Borrow handles by returning a pointer to the preallocated object pool for the specified type.
/**
* \param[in] The type of entity that this function is requesting for.
* \param[in] The number of handles to borrow.
* \return Pointer to the allocated handles.
*/
void ** borrow_handles(HandleType type, size_t number_of_handles) void ** borrow_handles(HandleType type, size_t number_of_handles)
{ {
switch (type) { switch (type) {
@ -182,6 +235,11 @@ public:
throw std::runtime_error("Unrecognized enum, could not borrow handle memory."); throw std::runtime_error("Unrecognized enum, could not borrow handle memory.");
} }
/// Return the borrowed handles by clearing the object pool for the correspondign type.
/**
* \param[in] The type of entity that this function is returning.
* \param[in] Pointer to the handles returned.
*/
void return_handles(HandleType type, void ** handles) void return_handles(HandleType type, void ** handles)
{ {
(void)handles; (void)handles;
@ -211,10 +269,12 @@ public:
} }
} }
/// Instantiate the next executable by borrowing space from the preallocated executables pool.
// \return Shared pointer to the executable.
executor::AnyExecutable::SharedPtr instantiate_next_executable() executor::AnyExecutable::SharedPtr instantiate_next_executable()
{ {
if (exec_seq_ >= bounds_.max_executables) { if (exec_seq_ >= bounds_.max_executables) {
// wrap around // wrap around (ring buffer logic)
exec_seq_ = 0; exec_seq_ = 0;
} }
size_t prev_exec_seq_ = exec_seq_; size_t prev_exec_seq_ = exec_seq_;
@ -224,6 +284,7 @@ public:
throw std::runtime_error("Executable pool member was NULL"); throw std::runtime_error("Executable pool member was NULL");
} }
// Make sure to clear the executable fields.
executable_pool_[prev_exec_seq_]->subscription.reset(); executable_pool_[prev_exec_seq_]->subscription.reset();
executable_pool_[prev_exec_seq_]->timer.reset(); executable_pool_[prev_exec_seq_]->timer.reset();
executable_pool_[prev_exec_seq_]->service.reset(); executable_pool_[prev_exec_seq_]->service.reset();
@ -234,6 +295,11 @@ public:
return executable_pool_[prev_exec_seq_]; return executable_pool_[prev_exec_seq_];
} }
/// General allocate: reserve space in the memory pool reserved by this function.
/**
* \param[in] size Number of bytes to allocate.
* \return Pointer to the allocated chunk of memory.
*/
void * alloc(size_t size) void * alloc(size_t size)
{ {
// Extremely naive static allocation strategy // Extremely naive static allocation strategy
@ -253,6 +319,10 @@ public:
return memory_pool_[prev_pool_seq]; return memory_pool_[prev_pool_seq];
} }
/// Release the allocated memory in the memory pool.
/**
* \param[in] Pointer to deallocate.
*/
void free(void * ptr) void free(void * ptr)
{ {
if (memory_map_.count(ptr) == 0) { if (memory_map_.count(ptr) == 0) {

View file

@ -46,6 +46,8 @@ class Node;
namespace subscription namespace subscription
{ {
/// Virtual base class for subscriptions. This pattern allows us to iterate over different template
/// specializations of Subscription, among other things.
class SubscriptionBase class SubscriptionBase
{ {
friend class rclcpp::executor::Executor; friend class rclcpp::executor::Executor;
@ -53,6 +55,12 @@ class SubscriptionBase
public: public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase); RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase);
/// Default constructor.
/**
* \param[in] node_handle The rmw representation of the node that owns this subscription.
* \param[in] topic_name Name of the topic to subscribe to.
* \param[in] ignore_local_publications True to ignore local publications (unused).
*/
SubscriptionBase( SubscriptionBase(
std::shared_ptr<rmw_node_t> node_handle, std::shared_ptr<rmw_node_t> node_handle,
rmw_subscription_t * subscription_handle, rmw_subscription_t * subscription_handle,
@ -68,6 +76,7 @@ public:
(void)ignore_local_publications_; (void)ignore_local_publications_;
} }
/// Default destructor.
virtual ~SubscriptionBase() virtual ~SubscriptionBase()
{ {
if (subscription_handle_) { if (subscription_handle_) {
@ -89,15 +98,26 @@ public:
} }
} }
/// Get the topic that this subscription is subscribed on.
const std::string & get_topic_name() const const std::string & get_topic_name() const
{ {
return this->topic_name_; return this->topic_name_;
} }
/// Borrow a new message.
// \return Shared pointer to the fresh message.
virtual std::shared_ptr<void> create_message() = 0; virtual std::shared_ptr<void> create_message() = 0;
/// Check if we need to handle the message, and execute the callback if we do.
/**
* \param[in] message Shared pointer to the message to handle.
* \param[in] message_info Metadata associated with this message.
*/
virtual void handle_message( virtual void handle_message(
std::shared_ptr<void> & message, std::shared_ptr<void> & message,
const rmw_message_info_t & message_info) = 0; const rmw_message_info_t & message_info) = 0;
/// Return the message borrowed in create_message.
// \param[in] Shared pointer to the returned message.
virtual void return_message(std::shared_ptr<void> & message) = 0; virtual void return_message(std::shared_ptr<void> & message) = 0;
virtual void handle_intra_process_message( virtual void handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm, rcl_interfaces::msg::IntraProcessMessage & ipm,
@ -120,6 +140,7 @@ private:
using namespace any_subscription_callback; using namespace any_subscription_callback;
/// Subscription implementation, templated on the type of message this subscription receives.
template<typename MessageT> template<typename MessageT>
class Subscription : public SubscriptionBase class Subscription : public SubscriptionBase
{ {
@ -128,6 +149,16 @@ class Subscription : public SubscriptionBase
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(Subscription); RCLCPP_SMART_PTR_DEFINITIONS(Subscription);
/// Default constructor.
/**
* The constructor for a subscription is almost never called directly. Instead, subscriptions
* should be instantiated through Node::create_subscription.
* \param[in] node_handle rmw representation of the node that owns this subscription.
* \param[in] topic_name Name of the topic to subscribe to.
* \param[in] ignore_local_publications True to ignore local publications (unused).
* \param[in] callback User-defined callback to call when a message is received.
* \param[in] memory_strategy The memory strategy to be used for managing message memory.
*/
Subscription( Subscription(
std::shared_ptr<rmw_node_t> node_handle, std::shared_ptr<rmw_node_t> node_handle,
rmw_subscription_t * subscription_handle, rmw_subscription_t * subscription_handle,
@ -143,14 +174,22 @@ public:
matches_any_intra_process_publishers_(nullptr) matches_any_intra_process_publishers_(nullptr)
{} {}
/// Support dynamically setting the message memory strategy.
/**
* Behavior may be undefined if called while the subscription could be executing.
* \param[in] message_memory_strategy Shared pointer to the memory strategy to set.
*/
void set_message_memory_strategy( void set_message_memory_strategy(
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr message_memory_strategy) typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr message_memory_strategy)
{ {
message_memory_strategy_ = message_memory_strategy; message_memory_strategy_ = message_memory_strategy;
} }
std::shared_ptr<void> create_message() std::shared_ptr<void> create_message()
{ {
/* The default message memory strategy provides a dynamically allocated message on each call to
* create_message, though alternative memory strategies that re-use a preallocated message may be
* used (see rclcpp/strategies/message_pool_memory_strategy.hpp).
*/
return message_memory_strategy_->borrow_message(); return message_memory_strategy_->borrow_message();
} }

View file

@ -66,12 +66,21 @@ public:
this->canceled_ = true; this->canceled_ = true;
} }
/// Check how long the timer has until its next scheduled callback.
// \return A std::chrono::duration representing the relative time until the next callback.
virtual std::chrono::nanoseconds virtual std::chrono::nanoseconds
time_until_trigger() = 0; time_until_trigger() = 0;
/// Is the clock steady (i.e. is the time between ticks constant?)
// \return True if the clock used by this timer is steady.
virtual bool is_steady() = 0; virtual bool is_steady() = 0;
// Interface for externally triggering the timer event /// Check if the timer needs to trigger the callback.
/**
* This function expects its caller to immediately trigger the callback after this function,
* since it maintains the last time the callback was triggered.
* \return True if the timer needs to trigger.
*/
virtual bool check_and_trigger() = 0; virtual bool check_and_trigger() = 0;
protected: protected:
@ -82,6 +91,7 @@ protected:
}; };
/// Generic timer templated on the clock type. Periodically executes a user-specified callback.
template<class Clock = std::chrono::high_resolution_clock> template<class Clock = std::chrono::high_resolution_clock>
class GenericTimer : public TimerBase class GenericTimer : public TimerBase
{ {
@ -90,6 +100,11 @@ class GenericTimer : public TimerBase
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(GenericTimer); RCLCPP_SMART_PTR_DEFINITIONS(GenericTimer);
/// Default constructor.
/**
* \param[in] period The interval at which the timer fires.
* \param[in] callback User-specified callback function.
*/
GenericTimer(std::chrono::nanoseconds period, CallbackType callback) GenericTimer(std::chrono::nanoseconds period, CallbackType callback)
: TimerBase(period, callback), loop_rate_(period) : TimerBase(period, callback), loop_rate_(period)
{ {
@ -98,12 +113,13 @@ public:
last_triggered_time_ = Clock::now() - period; last_triggered_time_ = Clock::now() - period;
} }
/// Default destructor.
virtual ~GenericTimer() virtual ~GenericTimer()
{ {
// Stop the timer from running.
cancel(); cancel();
} }
// return: true to trigger callback on the next "execute_timer" call in executor
bool bool
check_and_trigger() check_and_trigger()
{ {

View file

@ -38,10 +38,14 @@
namespace namespace
{ {
/// Represent the status of the global interrupt signal.
volatile sig_atomic_t g_signal_status = 0; volatile sig_atomic_t g_signal_status = 0;
/// Guard condition for interrupting the rmw implementation when the global interrupt signal fired.
rmw_guard_condition_t * g_sigint_guard_cond_handle = \ rmw_guard_condition_t * g_sigint_guard_cond_handle = \
rmw_create_guard_condition(); rmw_create_guard_condition();
/// Condition variable for timed sleep (see sleep_for).
std::condition_variable g_interrupt_condition_variable; std::condition_variable g_interrupt_condition_variable;
/// Mutex for protecting the global condition variable.
std::mutex g_interrupt_mutex; std::mutex g_interrupt_mutex;
#ifdef HAS_SIGACTION #ifdef HAS_SIGACTION
@ -50,6 +54,11 @@ struct sigaction old_action;
void (* old_signal_handler)(int) = 0; void (* old_signal_handler)(int) = 0;
#endif #endif
/// Handle the interrupt signal.
/** When the interrupt signal fires, the signal handler notifies the condition variable to wake up
* and triggers the interrupt guard condition, so that all global threads managed by rclcpp
* are interrupted.
*/
void void
#ifdef HAS_SIGACTION #ifdef HAS_SIGACTION
signal_handler(int signal_value, siginfo_t * siginfo, void * context) signal_handler(int signal_value, siginfo_t * siginfo, void * context)
@ -98,6 +107,11 @@ RMW_THREAD_LOCAL size_t thread_id = 0;
namespace utilities namespace utilities
{ {
/// Initialize communications via the rmw implementation and set up a global signal handler.
/**
* \param[in] argc Number of arguments.
* \param[in] argv Argument vector. Will eventually be used for passing options to rclcpp.
*/
void void
init(int argc, char * argv[]) init(int argc, char * argv[])
{ {
@ -138,12 +152,15 @@ init(int argc, char * argv[])
} }
} }
/// Check rclcpp's status.
// \return True if SIGINT hasn't fired yet, false otherwise.
bool bool
ok() ok()
{ {
return ::g_signal_status == 0; return ::g_signal_status == 0;
} }
/// Notify the signal handler and rmw that rclcpp is shutting down.
void void
shutdown() shutdown()
{ {
@ -157,12 +174,18 @@ shutdown()
} }
/// Get a handle to the rmw guard condition that manages the signal handler.
rmw_guard_condition_t * rmw_guard_condition_t *
get_global_sigint_guard_condition() get_global_sigint_guard_condition()
{ {
return ::g_sigint_guard_cond_handle; return ::g_sigint_guard_cond_handle;
} }
/// Use the global condition variable to block for the specified amount of time.
/**
* \param[in] nanoseconds A std::chrono::duration representing how long to sleep for.
* \return True if the condition variable did not timeout.
*/
bool bool
sleep_for(const std::chrono::nanoseconds & nanoseconds) sleep_for(const std::chrono::nanoseconds & nanoseconds)
{ {